Remove exit on swagger errros, add test case
Signed-off-by: bretfourbe <gwendal@cyberwatch.fr>
This commit is contained in:
parent
9f93179715
commit
2cf12d577a
|
@ -294,10 +294,9 @@ async def test_swagger_invalid_url(mock_browse):
|
|||
]
|
||||
|
||||
with mock.patch.object(sys, "argv", testargs):
|
||||
# will raise an exception because the url is not valid
|
||||
with pytest.raises(SystemExit):
|
||||
await wapiti_main()
|
||||
mock_browse.assert_called_once()
|
||||
# will not raise an exception because of the invalid url
|
||||
await wapiti_main()
|
||||
mock_browse.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
info:
|
||||
description: Vulnerable API
|
||||
title: Vulnerable API
|
||||
version: '0.1'
|
||||
openapi: 3.0.1
|
||||
paths:
|
||||
/:
|
||||
get:
|
||||
description: Description
|
||||
operationId: get_description
|
||||
responses:
|
||||
'200':
|
||||
content:
|
||||
text/html:
|
||||
schema:
|
||||
type: string
|
||||
description: HTML content generated successfully
|
||||
summary: Generate HTML content
|
||||
x-openapi-router-controller: vAPI
|
||||
/eval:
|
||||
get:
|
||||
description: Evaluate a string
|
||||
operationId: evaluate_str
|
||||
parameters:
|
||||
- example: THISISATOKEN
|
||||
in: header
|
||||
name: X-Auth-Token
|
||||
schema:
|
||||
type: string
|
||||
- in: query
|
||||
name: s
|
||||
schema:
|
||||
type: string
|
||||
responses:
|
||||
'200':
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: string
|
||||
description: Successful command
|
||||
x-openapi-router-controller: vAPI
|
||||
/help:
|
||||
get:
|
||||
description: Description
|
||||
operationId: get_help
|
||||
responses:
|
||||
'200':
|
||||
content:
|
||||
text/html:
|
||||
schema:
|
||||
type: string
|
||||
description: HTML content generated successfully
|
||||
summary: Generate HTML help content
|
||||
x-openapi-router-controller: vAPI
|
||||
servers:
|
||||
- url: ''
|
|
@ -141,3 +141,13 @@ def test_openapi_file():
|
|||
|
||||
for item in list_request:
|
||||
assert item in requests
|
||||
|
||||
def test_openapi_yaml_file():
|
||||
url = "tests/data/openapi.yaml"
|
||||
page = Swagger(base_url="https://fake.openapi.fr", swagger_url=url)
|
||||
|
||||
assert{
|
||||
"https://fake.openapi.fr/",
|
||||
"https://fake.openapi.fr/eval?s=default",
|
||||
"https://fake.openapi.fr/help"
|
||||
} == {x.url for x in page.get_requests()}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import json
|
||||
import sys
|
||||
import urllib.parse
|
||||
from prance import ResolvingParser, ValidationError
|
||||
from prance.util.formats import ParseError
|
||||
from prance.util.url import ResolutionError
|
||||
|
||||
from wapitiCore.net import Request
|
||||
|
@ -20,6 +20,7 @@ class Swagger():
|
|||
}
|
||||
|
||||
swagger_dict = None
|
||||
routes = None
|
||||
|
||||
|
||||
def __init__(self, swagger_url: str = None, base_url: str = None) -> None:
|
||||
|
@ -27,21 +28,20 @@ class Swagger():
|
|||
try:
|
||||
self.swagger_dict = ResolvingParser(swagger_url, backend='openapi-spec-validator',
|
||||
strict=False, recursion_limit=5).specification
|
||||
except ParseError as e:
|
||||
logging.error("[-] Error: Swagger file format invalid : " + str(e.args[0]))
|
||||
except ValidationError as e:
|
||||
logging.error("[-] Error: Swagger file is not valid : " + str(e.args[0]) +
|
||||
". See https://swagger.io/specification/ for more information.")
|
||||
sys.exit(1)
|
||||
except AssertionError:
|
||||
logging.error("[-] Error: File not found")
|
||||
sys.exit(1)
|
||||
except ResolutionError:
|
||||
logging.error("[-] Error: Unable to resolve the swagger file")
|
||||
sys.exit(1)
|
||||
else:
|
||||
logging.error("[-] Error: No URL or file")
|
||||
sys.exit(1)
|
||||
|
||||
self.routes = self._get_routes(self.swagger_dict, swagger_url, base_url)
|
||||
if self.swagger_dict:
|
||||
self.routes = self._get_routes(self.swagger_dict, swagger_url, base_url)
|
||||
|
||||
|
||||
@staticmethod
|
||||
|
@ -68,8 +68,9 @@ class Swagger():
|
|||
except ValueError as e:
|
||||
logging.error("[-] Error: Swagger file is not valid : " + str(e) +
|
||||
". See https://swagger.io/specification/ for more information.")
|
||||
sys.exit(1)
|
||||
|
||||
base_url = parsed_host._replace(path="").geturl()
|
||||
swagger_dict["basePath"] = base_url
|
||||
return base_url
|
||||
|
||||
@staticmethod
|
||||
def _check_properties(model_name: dict) -> dict:
|
||||
|
@ -85,9 +86,9 @@ class Swagger():
|
|||
# acording to their properties and definitions.
|
||||
# It will be easier to create request with default value.
|
||||
def _parse_object(self, model_name):
|
||||
try:
|
||||
model = {}
|
||||
for key in model_name:
|
||||
model = {}
|
||||
for key in model_name:
|
||||
try:
|
||||
if 'type' in model_name[key]:
|
||||
if 'object' in model_name[key]['type']:
|
||||
ref = self._check_properties(model_name[key])
|
||||
|
@ -107,11 +108,10 @@ class Swagger():
|
|||
model[key] = model_name[key]['type']
|
||||
else:
|
||||
model[key] = model_name[key]
|
||||
return model
|
||||
except ValueError as e:
|
||||
logging.error("[-] Error: Swagger file is not valid\n" + str(e) +
|
||||
"\nSee https://swagger.io/specification/ for more information")
|
||||
sys.exit(1)
|
||||
except ValueError as e:
|
||||
logging.error("[-] Error: Swagger file is not valid : " + str(e) +
|
||||
"\nSee https://swagger.io/specification/ for more information")
|
||||
return model
|
||||
|
||||
|
||||
def _check_params(self, params: dict) -> list:
|
||||
|
@ -168,17 +168,17 @@ class Swagger():
|
|||
url = swagger_url
|
||||
else:
|
||||
url = base_url
|
||||
try:
|
||||
request = {}
|
||||
base_path = self._get_base_url(swagger_dict, url)
|
||||
for path in swagger_dict['paths']:
|
||||
for method in swagger_dict['paths'][path]:
|
||||
if method == "parameters":
|
||||
path = self._parse_parameters(swagger_dict['paths'][path][method], path)
|
||||
continue
|
||||
route = method.upper() + " " + base_path + path
|
||||
params = self._get_parameters(swagger_dict, route, url)
|
||||
request[route] = []
|
||||
request = {}
|
||||
base_path = self._get_base_url(swagger_dict, url)
|
||||
for path in swagger_dict['paths']:
|
||||
for method in swagger_dict['paths'][path]:
|
||||
if method == "parameters":
|
||||
path = self._parse_parameters(swagger_dict['paths'][path][method], path)
|
||||
continue
|
||||
route = method.upper() + " " + base_path + path
|
||||
params = self._get_parameters(swagger_dict, route, url)
|
||||
request[route] = []
|
||||
try:
|
||||
if params:
|
||||
request_route = {"method": method.upper(), "route": route.replace(method.upper() + ' ', '')}
|
||||
request_route['params'] = []
|
||||
|
@ -189,14 +189,15 @@ class Swagger():
|
|||
else:
|
||||
request_route = {"method": method.upper(), "route": route.replace(method.upper() + ' ', '')}
|
||||
request[route].append(request_route)
|
||||
return request
|
||||
except KeyError as e:
|
||||
logging.error("[-] Error: " + str(e))
|
||||
sys.exit(1)
|
||||
except KeyError as e:
|
||||
logging.error("[-] Error: " + str(e))
|
||||
return request
|
||||
|
||||
|
||||
def _parse_parameters(self, params: list, route: str) -> str:
|
||||
for param in params:
|
||||
if not "type" in param:
|
||||
param["type"] = "string"
|
||||
if "in" in param:
|
||||
if param['in'] == "path":
|
||||
route = route.replace("{" + param['name'] + "}", self.AUTOFILL_VALUES[param['type']])
|
||||
|
@ -259,9 +260,12 @@ class Swagger():
|
|||
elif self.swagger_dict['host']:
|
||||
route_parsed = route.split(self.swagger_dict['host'])[1]
|
||||
else:
|
||||
print("todo")
|
||||
# we will use the target url from -u option
|
||||
pass
|
||||
method = route.split(' ')[0].lower()
|
||||
param = self.swagger_dict['paths'][route_parsed][method]['parameters'][0]['schema']
|
||||
if not 'type' in param:
|
||||
param['type'] = "string"
|
||||
if 'enum' in param:
|
||||
option += param['enum'][0]
|
||||
elif 'array' in param:
|
||||
|
@ -282,50 +286,58 @@ class Swagger():
|
|||
elif self.swagger_dict['host']:
|
||||
route_parsed = route.split(self.swagger_dict['host'])[1]
|
||||
else:
|
||||
print("todo")
|
||||
# we will use the target url from -u option
|
||||
pass
|
||||
method = route.split(' ')[0].lower()
|
||||
if not 'type' in param:
|
||||
param = self.swagger_dict['paths'][route_parsed][method]['parameters'][0]['schema']
|
||||
if not 'type' in param:
|
||||
# if still missing, we add a default type string
|
||||
param['type'] = "string"
|
||||
return url.replace("{" + name + "}", self.AUTOFILL_VALUES[param['type']])
|
||||
|
||||
|
||||
def _transform_body(self, param: dict) -> str:
|
||||
json_dict = {}
|
||||
for key in param['model']:
|
||||
if 'array' in param['model'][key]:
|
||||
json_dict[key] = self._transform_array(param['model'][key])
|
||||
elif isinstance(param['model'][key], dict):
|
||||
json_dict[key] = self._replace_param(param['model'][key])
|
||||
else:
|
||||
json_dict[key] = self.AUTOFILL_VALUES[param['model'][key]]
|
||||
if param['model']:
|
||||
for key in param['model']:
|
||||
if 'array' in param['model'][key]:
|
||||
json_dict[key] = self._transform_array(param['model'][key])
|
||||
elif isinstance(param['model'][key], dict):
|
||||
json_dict[key] = self._replace_param(param['model'][key])
|
||||
else:
|
||||
json_dict[key] = self.AUTOFILL_VALUES[param['model'][key]]
|
||||
return json.dumps(json_dict)
|
||||
|
||||
|
||||
def _transform_formData(self, param: dict, files: list) -> str:
|
||||
data = ""
|
||||
if 'enum' in param['type']:
|
||||
data = self._add_data(data, param['name'], param['type']['enum'][0])
|
||||
elif 'array' in param['type']:
|
||||
data = self._add_data(data, param['name'], "[" + self.AUTOFILL_VALUES[param['type']['array']] + "]")
|
||||
else:
|
||||
if param['type'] == "file":
|
||||
files.append([param['name'], self.AUTOFILL_VALUES[param['type']]])
|
||||
if 'type' in param:
|
||||
if 'enum' in param['type']:
|
||||
data = self._add_data(data, param['name'], param['type']['enum'][0])
|
||||
elif 'array' in param['type']:
|
||||
data = self._add_data(data, param['name'], "[" + self.AUTOFILL_VALUES[param['type']['array']] + "]")
|
||||
else:
|
||||
data = self._add_data(data, param['name'], self.AUTOFILL_VALUES[param['type']])
|
||||
if param['type'] == "file":
|
||||
files.append([param['name'], self.AUTOFILL_VALUES[param['type']]])
|
||||
else:
|
||||
data = self._add_data(data, param['name'], self.AUTOFILL_VALUES[param['type']])
|
||||
return data
|
||||
|
||||
|
||||
# create request with default value from swagger file
|
||||
def _create_request(self, routes: dict) -> list[Request]:
|
||||
requests_list = []
|
||||
for route in routes:
|
||||
url = routes[route][0]['route']
|
||||
for route, urls in routes.items():
|
||||
if not urls:
|
||||
continue
|
||||
url = urls[0]['route']
|
||||
data = ""
|
||||
header = {}
|
||||
option = ""
|
||||
files = []
|
||||
if 'params' in routes[route][0]:
|
||||
for param in routes[route][0]['params']:
|
||||
if 'params' in urls[0]:
|
||||
for param in urls[0]['params']:
|
||||
if 'in' in param:
|
||||
if param['in'] == "path":
|
||||
url = self._transform_url(param, url, route)
|
||||
|
@ -336,8 +348,10 @@ class Swagger():
|
|||
elif param['in'] == "formData":
|
||||
data = self._transform_formData(param, files)
|
||||
elif param['in'] == "header":
|
||||
if not 'type' in param:
|
||||
param["type"] = "string"
|
||||
header[param['name']] = self.AUTOFILL_VALUES[param['type']]
|
||||
request = Request(path=url+option, method=routes[route][0]['method'], post_params=data, file_params=files)
|
||||
request = Request(path=url+option, method=urls[0]['method'], post_params=data, file_params=files)
|
||||
request.set_headers(header)
|
||||
requests_list.append(request)
|
||||
return requests_list
|
||||
|
@ -373,4 +387,6 @@ class Swagger():
|
|||
return json_dict
|
||||
|
||||
def get_requests(self) -> list[Request]:
|
||||
return self._create_request(self.routes)
|
||||
if self.routes:
|
||||
return self._create_request(self.routes)
|
||||
return []
|
||||
|
|
Loading…
Reference in New Issue