Prettify JSON with Python

If you are testing REST , it happens that you need to prettify JSON. With Python you do not need any other tool except your terminal!

Example

{"glossary":{"title":"example glossary","GlossDiv":{"title":"S","GlossList":{"GlossEntry":{"ID":"SGML","SortAs":"SGML","GlossTerm":"Standard Generalized Markup Language","Acronym":"SGML","Abbrev":"ISO 8879:1986","GlossDef":{"para":"A meta-markup language, used to create markup languages such as DocBook.","GlossSeeAlso":["GML","XML"]},"GlossSee":"markup"}}}}}

Now open you terminal…

# Using module json.tool to prettify and validate
$ cat example.json | python -m json.tool
{
    "glossary": {
        "GlossDiv": {
            "GlossList": {
                "GlossEntry": {
                    "Abbrev": "ISO 8879:1986",
                    "Acronym": "SGML",
                    "GlossDef": {
                        "GlossSeeAlso": [
                            "GML",
                            "XML"
                        ],
                        "para": "A meta-markup language, used to create markup languages such as DocBook."
                    },
                    "GlossSee": "markup",
                    "GlossTerm": "Standard Generalized Markup Language",
                    "ID": "SGML",
                    "SortAs": "SGML"
                }
            },
            "title": "S"
        },
        "title": "example glossary"
    }
}

JUnit report with Python Webdriver

To create JUnit reports, you just need the python library xmlrunner. This is needed for the integration with build server like Jenkins.

Installation

# example installation with pip
$ sudo pip install xmlrunner

Usage

Just replace for example:

unittest.TextTestRunner(verbosity=2).run(test_suite)

with

from xmlrunner import xmlrunner

""" code for test suite ... """

xmlrunner.XMLTestRunner(verbosity=2, output='reports').run(test_suite)

Jenkins

On Jenkins add a new “post-build” action and select the Publish JUnit test result report. Add now the string “reports/*.xml” into the “Test report XMLs” field.

DDT with Python Selenium

DDT (Data-driven Testing) with Python Selenium Webdriver is very easy! DDT becomes very useful if you have test cases that contains the same test steps. All values could outsourced into files or databases. This tutorial use CSV files.

Precondition

  • Python installed
  • selenium and ddt library installed

Example

The folder structure for this tutorial looks like:

├── data
│   └── scenario_a.csv
├── library
│   ├── GetData.py
│   └── __init__.py
├── scenarios
│   ├── __init__.py
│   └── scenario_a.py
└── testsuite.py

Into folder “data” we store the csv files. The packages “library” include a function to read the specific csv files and the package “scenarios” include the test cases. The test suite is on root folder.

#!/usr/bin/env python
# -*- coding: utf8 -*-

import unittest

from scenarios.scenario_a import TestScenarioA


# load test cases
scenario_a = unittest.TestLoader().loadTestsFromTestCase(TestScenarioA)

# create test suite
test_suite = unittest.TestSuite([scenario_a])

# execute test suite
unittest.TextTestRunner(verbosity=2).run(test_suite)

Into the “testsuite.py” we add all test cases provided by scenario package.

data folder

target_url,elem_name,search_value
http://softwaretester.info,s,python
http://softwaretester.info,s,selenium
http://softwaretester.info,s,webdriver
http://softwaretester.info,s,automation

The CSV stores the test data that we supplied to the @data decorator of test case.

library package

#!/usr/bin/env python
# -*- coding: utf8 -*-

__author__ = 'lupin'
#!/usr/bin/env python
# -*- coding: utf8 -*-

import csv


def get_csv_data(csv_path):
    """
    read test data from csv and return as list

    @type csv_path: string
    @param csv_path: some csv path string
    @return list
    """
    rows = []
    csv_data = open(str(csv_path), "rb")
    content = csv.reader(csv_data)

    # skip header line
    next(content, None)

    # add rows to list
    for row in content:
        rows.append(row)

    return rows

Just for read the csv and return the values as a list.

scenarios package

#!/usr/bin/env python
# -*- coding: utf8 -*-

__author__ = 'lupin'
#!/usr/bin/env python
# -*- coding: utf8 -*-

import unittest

from selenium import webdriver
from ddt import ddt, data, unpack

from library.GetData import get_csv_data


@ddt
class TestScenarioA(unittest.TestCase):
    """ inheriting the TestCase class"""

    @classmethod
    def setUpClass(cls):
        """test preparation"""
        cls.driver = webdriver.Firefox()
        cls.driver.implicitly_wait(3)
        cls.driver.set_window_size(450, 500)

    @data(*get_csv_data('./data/scenario_a.csv'))
    @unpack
    def test_search(self, target_url, elem_name, search_value):
        """test case for scenario a"""
        driver = self.driver
        driver.get(target_url)

        btn_elem = driver.find_element_by_id('search-toggle')
        btn_elem.click()

        input_elem = driver.find_element_by_name(elem_name)
        input_elem.clear()
        input_elem.send_keys(search_value)
        input_elem.submit()

    @classmethod
    def tearDownClass(cls):
        """clean up"""
        cls.driver.close()

Test case with @ddt (for classes), @data and @unpack (for methods) decorators.

  • @data take the arguments from csv file
  • @unpack unpacks tuples or lists into multiple arguments

The test_search() method accepts the arguments, which will be mapped to the tuple values by ddt.

Run

$ python -B testsuite.py

Python, Selenium and PhantomJS – ignore certificate errors

Background

You are done with your work and push all into Git. The Build-Server starts his work and all test scripts are failing. Short look and it is clear – certificate errors. The next example shows, how to ignore certificate errors on PhantomJS.

@classmethod
def setUpClass(cls):
    """starting point for test cases"""

    cls.driver = webdriver.PhantomJS(service_args=['--ignore-ssl-errors=true'])
    cls.driver.implicitly_wait(30)
    cls.driver.set_window_size(1024, 768)
    cls.driver.get(cls.url)

Now it should work….

Find out subdomains

One way of finding out subdomains are wordlists. Knockpy offers exactly this possibility! It is written in Python, easy to install and to use. The usage of own wordlists is possible, too. The output displayed in the terminal and saved in CSV file.

Precondition

  • Python installed

Installation

# install with pip
$ sudo pip install https://github.com/guelfoweb/knock/archive/knock3.zip

Usage

# usage with internal wordlist
$ knockpy domain.com

# usage with own wordlist
$ knockpy domain.com -w wordlist.txt

# resolve domain name
$ knockpy -r domain.com

# check zone transfer
$ knockpy -r domain.com

FTP Brute-force attack

As a penetration tester you may need to check your FTP Server(s). One possibilty is brute-force passwords to auditing. This tutorial show you how easy you can use Python to create such a tool.

Precondition

  • Python installed
  • Crunch installed (Tutorial)

Create Python Script

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import ftplib
import socket
import sys
from datetime import datetime


class FtpCrack(object):

    def __init__(self, host, username='', password=''):
        if not host:
            print "[*] Error: no host given"
            sys.exit(2)
        else:
            self.start = datetime.now()
            self.host = host
            self.username = username
            self.password = password
            self.ftp = None

    def _close_ftp_connection(self):
        self.ftp.quit()
        print "[*] Close FTP connection after ", datetime.now() - self.start

    def _list_ftp_directory(self):
        try:
            print "[*] List FTP directory content"
            self.ftp.dir()
        except ftplib.all_errors:
            print "[*] ERROR: Cannot list content"
        self._close_ftp_connection()

    def ftp_connect(self):
        try:
            self.ftp = ftplib.FTP(self.host)
        except (socket.error, socket.gaierror) as err:
            print "[*] Cannot connect to %s" % self.host
            print "[*] Error %s" % err
            sys.exit(2)

        print "[*] Connected to %s" % self.host

    def _ftp_anonymous_login(self):
        try:
            self.ftp.login()
        except ftplib.error_perm:
            print "[*] ERROR: cannot login anonymously"
            self._close_ftp_connection()
            sys.exit(2)

        print "[*] Anonymous login"
        self._list_ftp_directory()

    def _ftp_credential_login(self):
        print "[*] User: %s - Password: %s" % (self.username, self.password)
        try:
            self.ftp.login(self.username, self.password)
        except ftplib.error_perm:
            print "[*] ERROR: wrong credentials"
            self._close_ftp_connection()
            sys.exit(2)

        print "[*] Login with credentials"
        self._list_ftp_directory()

    def ftp_login(self):
        if not self.username or not self.password:
            self._ftp_anonymous_login()
        else:
            self._ftp_credential_login()


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Brute-force FTP')
    parser.add_argument('host', help='target host or ip')
    parser.add_argument('-u', '--usr', help='login user name')
    parser.add_argument('-p', '--pwd', help='login password')
    args = parser.parse_args()

    RUN = FtpCrack(args.host, args.usr, args.pwd)
    RUN.ftp_connect()
    RUN.ftp_login()

The code should be clear and self-explanatory.

Usage examples

# show help
$ ./FtpCrack.py -h

# example anonymous ftp
$ ./FtpCrack.py <host>

# example with credentials
$ ./FtpCrack.py <host> -u <user> -p <password>

# example crunch (pipe to password)
$ ./crunch 3 3 abc | xargs -I password ./FtpCrack.py <host> -u <user> -p password

You can extend the code, for example to read the content from wordlists.

HAR with Python WebDriver and BrowserMob Proxy

This time is shown how to automate performance test for web sites. Tools in usage are Python Selenium WebDriver and BrowserMob proxy. The results are HAR files which can be viewed in HAR Viewer.

Precondition

  • JAVA installed
  • Python Packages for selenium and browsermob-proxy
selenium
browsermob-proxy

Preparation

Download BrowserMob Proxy and check if proxy can started by command-line.

# change to 
$ cd browsermob-proxy-2.1.0-beta-1/bin/

# start proxy on port 9090
$ ./browsermob-proxy -port 9090

Create Python Class

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Python - BrowserMob - WebDriver"""
from browsermobproxy import Server
from selenium import webdriver
import json


class CreateHar(object):
    """create HTTP archive file"""

    def __init__(self, mob_path):
        """initial setup"""
        self.browser_mob = mob_path
        self.server = self.driver = self.proxy = None

    @staticmethod
    def __store_into_file(title, result):
        """store result"""
        har_file = open(title + '.har', 'w')
        har_file.write(str(result))
        har_file.close()

    def __start_server(self):
        """prepare and start server"""
        self.server = Server(self.browser_mob)
        self.server.start()
        self.proxy = self.server.create_proxy()

    def __start_driver(self):
        """prepare and start driver"""
        profile = webdriver.FirefoxProfile()
        profile.set_proxy(self.proxy.selenium_proxy())
        self.driver = webdriver.Firefox(firefox_profile=profile)

    def start_all(self):
        """start server and driver"""
        self.__start_server()
        self.__start_driver()

    def create_har(self, title, url):
        """start request and parse response"""
        self.proxy.new_har(title)
        self.driver.get(url)
        result = json.dumps(self.proxy.har, ensure_ascii=False)
        self.__store_into_file(title, result)

    def stop_all(self):
        """stop server and driver"""
        self.server.stop()
        self.driver.quit()


if __name__ == '__main__':
    path = "browsermob-proxy-2.1.0-beta-1/bin/browsermob-proxy"
    RUN = CreateHar(path)
    RUN.start_all()
    RUN.create_har('google', 'http://google.com')
    RUN.create_har('stackoverflow', 'http://stackoverflow.com')
    RUN.stop_all()

Note: The highlighted line must contain the path to BrowserMob Proxy!
Happy testing! If you use PhantomJS instead of Firefox, you can use this on Build server like Jenkins/Hudson and so on, too.

Create information gathering test application

It is time again for an extensive tutorial. This time, a tiny test application for passive and active information gathering. After the instruction you are welcome to improve the application with more features! Okay let’s start…

What should it do?

The security tester selects a information gathering method first. As second step the testers insert the URL or IP in a testfield and press a button. The result should printed out in a text area. The GUI should look like this:

Sensei Mockup

How it is implemented?

The prefered language is Python 2.7. So it is portable to different OS and for the most of methods are already packages available. The GUI is done with Tkinter. Tkinter provides all objects which are needed as widgets and ranges for this scope out completely. The file and folder structure look like:

├── essential
│   ├── __init__.py
│   └── timestop.py
├── gathering
│   ├── __init__.py
│   ├── geolocation.py
│   ├── icmpping.py
│   ├── information.py
│   ├── wappalyzer.py
│   └── whoisgathering.py
├── requirements.txt
└── sensei.py

File content

Files in root directory:

requests
python-Wappalyzer
python-whois
#!/usr/bin/env python
#  -*- coding: utf-8 -*-
from urlparse import urlparse
from gathering import IcmpPing, WhoisGathering, GeoLocation, WappAlyzer
from Tkinter import (Tk, Frame, StringVar, OptionMenu, Entry, Button, Text,
                     W, E, END, FALSE)


class Sensei(object):

    def __init__(self):
        self.root = Tk()
        self.root.title('Sensei')
        self.root.resizable(width=FALSE, height=FALSE)
        self.select = [
            'ICMP Ping', 'Whois', 'GeoLocation', 'Wappalyzer'
        ]
        self.option = StringVar(self.root)
        self.option.set(self.select[0])
        self.method = None
        self.url = None
        self.result = None

    def create_gui(self):
        self._top_frame()
        self._output_frame()
        self.root.mainloop()

    def quite_app(self):
        self.root.quit()

    def _start_request(self):
        self.result.delete(1.0, END)
        action = None
        target = self.url.get()
        method = self.option.get()
        if method == 'ICMP Ping':
            action = IcmpPing()
        elif method == 'Whois':
            action = WhoisGathering()
        elif method == 'GeoLocation':
            action = GeoLocation()
        elif method == 'Wappalyzer':
            action = WappAlyzer()
        if target:
            action.set_target(target)
            action.do_request()
            value = action.get_result()
        else:
            value = 'Internal Error'
        self.result.insert(END, value)

    def check_protocol(self, value):
        target = self.url.get()
        if value == 'Wappalyzer':
            if 'http' not in target:
                self.url.insert(0, 'http://')
        else:
            if 'http' in target:
                new_target = urlparse(target)
                self.url.delete(0, END)
                self.url.insert(0, new_target[1])

    def _top_frame(self):
        top_frame = Frame(self.root)
        top_frame.grid(column=0, row=0, sticky=W+E)
        self.method = OptionMenu(
            top_frame, self.option, *self.select, command=self.check_protocol
        )
        self.method.config(width=15)
        self.method.grid(column=0, row=0)
        self.url = Entry(top_frame, width=50)
        self.url.grid(column=1, row=0)
        Button(top_frame, text='Request', command=self._start_request).grid(
            column=2, row=0)
        Button(top_frame, text='Close', command=self.quite_app).grid(
            column=3, row=0)

    def _output_frame(self):
        output_frame = Frame(self.root)
        output_frame.grid(column=0, row=2, sticky=W+E)
        self.result = Text(output_frame, height=15)
        self.result.grid(column=0, row=0)


if __name__ == '__main__':
    RUN = Sensei()
    RUN.create_gui()

Files in essential:

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
from essential.timestop import TimeStop
#!/usr/bin/env python
#  -*- coding: utf-8 -*-
from datetime import datetime


class TimeStop(object):
    """missing docstring"""

    __start = None

    @classmethod
    def start_measure(cls):
        cls.__start = datetime.now()

    @classmethod
    def stop_measure(cls):
        stop = datetime.now()
        total = stop - cls.__start
        return ">>>> request complete in " + str(total)

Files in gathering:

#!/usr/bin/env python
#  -*- coding: utf-8 -*-

from gathering.information import InformationGathering
from gathering.icmpping import IcmpPing
from gathering.whoisgathering import WhoisGathering
from gathering.geolocation import GeoLocation
from gathering.wappalyzer import WappAlyzer
#!/usr/bin/env python
#  -*- coding: utf-8 -*-


class InformationGathering(object):

    def __init__(self):
        self.errors = 0
        self.target = ''
        self.result = ''

    def set_target(self, target):
        victim = target.strip(' \t\n\r')
        if not victim:
            self.errors += 1
            self.result = 'No target given!'
        else:
            self.target = target

    def get_result(self):
        return self.result
#!/usr/bin/env python
#  -*- coding: utf-8 -*-
from gathering.information import InformationGathering
from essential.timestop import TimeStop
import os
import platform


class IcmpPing(InformationGathering):

    COMMAND = ''

    def __create_command(self):
        operation_system = platform.system()
        if operation_system == "Windows":
            self.COMMAND = "ping -n 1 "
        elif operation_system == "Linux":
            self.COMMAND = "ping -c 1 "
        else:
            self.COMMAND = "ping -c 1 "

    def do_request(self):
        if self.errors == 0:
            self.__create_command()
            command = self.COMMAND + self.target
            TimeStop.start_measure()
            response = os.popen(command)
            for line in response.readlines():
                self.result += line
            self.result += TimeStop.stop_measure()
#!/usr/bin/env python
#  -*- coding: utf-8 -*-
from gathering.information import InformationGathering
from essential.timestop import TimeStop
import whois


class WhoisGathering(InformationGathering):

    def do_request(self):
        if self.errors == 0:
            TimeStop.start_measure()
            result = whois.whois(self.target)
            self.result += result.text
            self.result += TimeStop.stop_measure()
#!/usr/bin/env python
#  -*- coding: utf-8 -*-
from gathering.information import InformationGathering
from essential.timestop import TimeStop
import requests


class GeoLocation(InformationGathering):

    API = 'http://ip-api.com/json/'

    def do_request(self):
        if self.errors == 0:
            target = self.API + self.target
            TimeStop.start_measure()
            response = requests.get(target)
            output = response.json()
            for key, val in output.items():
                if val:
                    self.result += str(key) + " => " + str(val) + "\n"
            self.result += TimeStop.stop_measure()
#!/usr/bin/env python
#  -*- coding: utf-8 -*-
from gathering.information import InformationGathering
from essential.timestop import TimeStop
from Wappalyzer import Wappalyzer, WebPage


class WappAlyzer(InformationGathering):

    def do_request(self):
        if self.errors == 0:
            TimeStop.start_measure()
            wappalyzer = Wappalyzer.latest()
            website = WebPage.new_from_url(self.target)
            output = wappalyzer.analyze(website)
            for val in output:
                self.result += " => " + str(val) + "\n"
            self.result += TimeStop.stop_measure()

That was it. The result looks like this:

Sensei

Improve it with your ideas!