scripts/recon_scanner.py

433 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
"""
DeadHydra Recon Scanner
Advanced Penetration Testing Reconnaissance Tool
Author: DeadHydra Collective
"""
import argparse
import socket
import subprocess
import sys
import json
import dns.resolver
import requests
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import ipaddress
import re
from urllib.parse import urlparse
class Colors:
"""Terminal colors for output"""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class ReconScanner:
def __init__(self, target, output_file=None):
self.target = target
self.output_file = output_file
self.results = {
'target': target,
'scan_time': datetime.now().isoformat(),
'ip_addresses': [],
'open_ports': [],
'services': [],
'subdomains': [],
'dns_records': {},
'web_technologies': {},
'ssl_info': {}
}
def banner(self):
"""Display scanner banner"""
banner = f"""
{Colors.OKGREEN}
╔═══════════════════════════════════════════════════════════╗
║ ║
║ ██████╗ ███████╗ █████╗ ██████╗ ██╗ ██╗ ║
║ ██╔══██╗██╔════╝██╔══██╗██╔══██╗██║ ██║ ║
║ ██║ ██║█████╗ ███████║██║ ██║███████║ ║
║ ██║ ██║██╔══╝ ██╔══██║██║ ██║██╔══██║ ║
║ ██████╔╝███████╗██║ ██║██████╔╝██║ ██║ ║
║ ╚═════╝ ╚══════╝╚═╝ ╚═╝╚═════╝ ╚═╝ ╚═╝ ║
║ ║
║ DeadHydra Recon Scanner v1.0 ║
║ Advanced Penetration Testing Tool ║
║ ║
╚═══════════════════════════════════════════════════════════╝
{Colors.ENDC}
{Colors.OKCYAN}[*] Target: {self.target}
[*] Scan Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
{Colors.ENDC}
"""
print(banner)
def resolve_target(self):
"""Resolve target to IP addresses"""
print(f"\n{Colors.BOLD}[+] Resolving Target...{Colors.ENDC}")
try:
# Try to parse as IP address first
try:
ip = ipaddress.ip_address(self.target)
self.results['ip_addresses'].append(str(ip))
print(f"{Colors.OKGREEN} [✓] IP Address: {ip}{Colors.ENDC}")
return
except ValueError:
pass
# Resolve hostname
ip_addresses = socket.gethostbyname_ex(self.target)[2]
for ip in ip_addresses:
self.results['ip_addresses'].append(ip)
print(f"{Colors.OKGREEN} [✓] Resolved: {ip}{Colors.ENDC}")
except socket.gaierror:
print(f"{Colors.FAIL} [✗] Failed to resolve target{Colors.ENDC}")
sys.exit(1)
def port_scan(self, ports=None, threads=100):
"""Scan for open ports"""
print(f"\n{Colors.BOLD}[+] Port Scanning...{Colors.ENDC}")
if ports is None:
# Common ports
ports = [21, 22, 23, 25, 53, 80, 110, 111, 135, 139, 143, 443, 445,
993, 995, 1723, 3306, 3389, 5900, 8080, 8443, 8888]
def scan_port(port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((self.results['ip_addresses'][0], port))
sock.close()
if result == 0:
return port
except:
pass
return None
with ThreadPoolExecutor(max_workers=threads) as executor:
future_to_port = {executor.submit(scan_port, port): port for port in ports}
for future in as_completed(future_to_port):
port = future.result()
if port:
self.results['open_ports'].append(port)
service = self.identify_service(port)
print(f"{Colors.OKGREEN} [✓] Port {port}/tcp open - {service}{Colors.ENDC}")
def identify_service(self, port):
"""Identify common services by port"""
services = {
21: 'FTP', 22: 'SSH', 23: 'Telnet', 25: 'SMTP', 53: 'DNS',
80: 'HTTP', 110: 'POP3', 111: 'RPCBind', 135: 'MSRPC', 139: 'NetBIOS',
143: 'IMAP', 443: 'HTTPS', 445: 'SMB', 993: 'IMAPS', 995: 'POP3S',
1723: 'PPTP', 3306: 'MySQL', 3389: 'RDP', 5900: 'VNC',
8080: 'HTTP-Proxy', 8443: 'HTTPS-Alt', 8888: 'HTTP-Alt'
}
return services.get(port, 'Unknown')
def banner_grab(self):
"""Grab service banners from open ports"""
print(f"\n{Colors.BOLD}[+] Banner Grabbing...{Colors.ENDC}")
for port in self.results['open_ports']:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((self.results['ip_addresses'][0], port))
# Send HTTP request for web services
if port in [80, 443, 8080, 8443, 8888]:
sock.send(b'GET / HTTP/1.1\r\nHost: ' + self.target.encode() + b'\r\n\r\n')
banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
sock.close()
if banner:
service_info = {
'port': port,
'service': self.identify_service(port),
'banner': banner[:200] # Limit banner length
}
self.results['services'].append(service_info)
print(f"{Colors.OKGREEN} [✓] Port {port}: {banner[:100]}...{Colors.ENDC}")
except:
pass
def dns_enumeration(self):
"""Enumerate DNS records"""
print(f"\n{Colors.BOLD}[+] DNS Enumeration...{Colors.ENDC}")
# Remove IP prefix if present
domain = self.target
try:
ipaddress.ip_address(domain)
print(f"{Colors.WARNING} [!] Target is an IP address, skipping DNS enumeration{Colors.ENDC}")
return
except ValueError:
pass
record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'SOA', 'CNAME']
for record_type in record_types:
try:
answers = dns.resolver.resolve(domain, record_type)
records = [str(rdata) for rdata in answers]
self.results['dns_records'][record_type] = records
print(f"{Colors.OKGREEN} [✓] {record_type} Records:{Colors.ENDC}")
for record in records:
print(f"{record}")
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers):
pass
except Exception as e:
pass
def subdomain_enumeration(self, wordlist=None):
"""Enumerate subdomains"""
print(f"\n{Colors.BOLD}[+] Subdomain Enumeration...{Colors.ENDC}")
# Check if target is a domain
try:
ipaddress.ip_address(self.target)
print(f"{Colors.WARNING} [!] Target is an IP address, skipping subdomain enumeration{Colors.ENDC}")
return
except ValueError:
pass
# Common subdomains
if wordlist is None:
subdomains = [
'www', 'mail', 'ftp', 'localhost', 'webmail', 'smtp', 'pop', 'ns1',
'webdisk', 'ns2', 'cpanel', 'whm', 'autodiscover', 'autoconfig',
'dev', 'staging', 'test', 'api', 'admin', 'portal', 'vpn',
'blog', 'shop', 'store', 'app', 'mobile', 'cdn', 'static'
]
else:
try:
with open(wordlist, 'r') as f:
subdomains = [line.strip() for line in f]
except FileNotFoundError:
print(f"{Colors.FAIL} [✗] Wordlist not found{Colors.ENDC}")
return
def check_subdomain(subdomain):
target_domain = f"{subdomain}.{self.target}"
try:
ip = socket.gethostbyname(target_domain)
return (target_domain, ip)
except socket.gaierror:
return None
print(f"{Colors.OKCYAN} [*] Testing {len(subdomains)} subdomains...{Colors.ENDC}")
with ThreadPoolExecutor(max_workers=50) as executor:
future_to_sub = {executor.submit(check_subdomain, sub): sub for sub in subdomains}
for future in as_completed(future_to_sub):
result = future.result()
if result:
subdomain, ip = result
self.results['subdomains'].append({'subdomain': subdomain, 'ip': ip})
print(f"{Colors.OKGREEN} [✓] Found: {subdomain}{ip}{Colors.ENDC}")
def web_technology_detection(self):
"""Detect web technologies"""
print(f"\n{Colors.BOLD}[+] Web Technology Detection...{Colors.ENDC}")
# Check if HTTP/HTTPS ports are open
web_ports = [port for port in self.results['open_ports'] if port in [80, 443, 8080, 8443, 8888]]
if not web_ports:
print(f"{Colors.WARNING} [!] No web services detected{Colors.ENDC}")
return
for port in web_ports:
protocol = 'https' if port in [443, 8443] else 'http'
url = f"{protocol}://{self.target}:{port}" if port not in [80, 443] else f"{protocol}://{self.target}"
try:
response = requests.get(url, timeout=5, verify=False, allow_redirects=True)
tech_info = {
'url': url,
'status_code': response.status_code,
'server': response.headers.get('Server', 'Unknown'),
'powered_by': response.headers.get('X-Powered-By', 'Unknown'),
'headers': dict(response.headers)
}
# Detect common technologies from headers and content
content = response.text.lower()
technologies = []
if 'wordpress' in content:
technologies.append('WordPress')
if 'joomla' in content:
technologies.append('Joomla')
if 'drupal' in content:
technologies.append('Drupal')
if 'react' in content or 'reactjs' in content:
technologies.append('React')
if 'angular' in content:
technologies.append('Angular')
if 'vue' in content or 'vuejs' in content:
technologies.append('Vue.js')
if 'jquery' in content:
technologies.append('jQuery')
if 'bootstrap' in content:
technologies.append('Bootstrap')
tech_info['detected_technologies'] = technologies
self.results['web_technologies'][url] = tech_info
print(f"{Colors.OKGREEN} [✓] {url}{Colors.ENDC}")
print(f" Status: {response.status_code}")
print(f" Server: {tech_info['server']}")
if technologies:
print(f" Technologies: {', '.join(technologies)}")
except requests.exceptions.RequestException as e:
print(f"{Colors.WARNING} [!] Failed to connect to {url}{Colors.ENDC}")
def whois_lookup(self):
"""Perform WHOIS lookup"""
print(f"\n{Colors.BOLD}[+] WHOIS Lookup...{Colors.ENDC}")
try:
# Check if target is IP
try:
ipaddress.ip_address(self.target)
print(f"{Colors.WARNING} [!] WHOIS for IP addresses not implemented{Colors.ENDC}")
return
except ValueError:
pass
result = subprocess.run(['whois', self.target],
capture_output=True,
text=True,
timeout=10)
if result.returncode == 0:
whois_data = result.stdout
self.results['whois'] = whois_data
# Extract important info
registrar = re.search(r'Registrar: (.+)', whois_data)
creation_date = re.search(r'Creation Date: (.+)', whois_data)
expiration_date = re.search(r'Expir(?:y|ation) Date: (.+)', whois_data)
if registrar:
print(f"{Colors.OKGREEN} [✓] Registrar: {registrar.group(1)}{Colors.ENDC}")
if creation_date:
print(f"{Colors.OKGREEN} [✓] Created: {creation_date.group(1)}{Colors.ENDC}")
if expiration_date:
print(f"{Colors.OKGREEN} [✓] Expires: {expiration_date.group(1)}{Colors.ENDC}")
else:
print(f"{Colors.WARNING} [!] WHOIS lookup failed{Colors.ENDC}")
except FileNotFoundError:
print(f"{Colors.WARNING} [!] whois command not found{Colors.ENDC}")
except subprocess.TimeoutExpired:
print(f"{Colors.WARNING} [!] WHOIS lookup timed out{Colors.ENDC}")
def save_results(self):
"""Save results to file"""
if self.output_file:
try:
with open(self.output_file, 'w') as f:
json.dump(self.results, f, indent=4)
print(f"\n{Colors.OKGREEN}[✓] Results saved to: {self.output_file}{Colors.ENDC}")
except Exception as e:
print(f"\n{Colors.FAIL}[✗] Failed to save results: {e}{Colors.ENDC}")
def print_summary(self):
"""Print scan summary"""
print(f"\n{Colors.BOLD}{'='*60}")
print(f"SCAN SUMMARY")
print(f"{'='*60}{Colors.ENDC}")
print(f"{Colors.OKCYAN}Target: {self.target}")
print(f"IP Addresses: {', '.join(self.results['ip_addresses'])}")
print(f"Open Ports: {len(self.results['open_ports'])}")
print(f"Services Detected: {len(self.results['services'])}")
print(f"Subdomains Found: {len(self.results['subdomains'])}")
print(f"Scan Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Colors.ENDC}")
print(f"{Colors.BOLD}{'='*60}{Colors.ENDC}\n")
def run_full_scan(self, ports=None, subdomain_wordlist=None):
"""Run complete reconnaissance scan"""
self.banner()
self.resolve_target()
self.port_scan(ports=ports)
self.banner_grab()
self.dns_enumeration()
self.subdomain_enumeration(wordlist=subdomain_wordlist)
self.web_technology_detection()
self.whois_lookup()
self.print_summary()
self.save_results()
def main():
parser = argparse.ArgumentParser(
description='DeadHydra Recon Scanner - Advanced Penetration Testing Reconnaissance Tool',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s -t example.com
%(prog)s -t 192.168.1.1 -p 1-1000
%(prog)s -t example.com -o results.json
%(prog)s -t example.com --subdomains wordlist.txt
%(prog)s -t example.com --quick
"""
)
parser.add_argument('-t', '--target', required=True,
help='Target domain or IP address')
parser.add_argument('-p', '--ports',
help='Port range (e.g., 1-1000) or comma-separated ports')
parser.add_argument('-o', '--output',
help='Output file for results (JSON format)')
parser.add_argument('--subdomains',
help='Subdomain wordlist file')
parser.add_argument('--quick', action='store_true',
help='Quick scan (common ports only)')
parser.add_argument('--full', action='store_true',
help='Full scan (all 65535 ports)')
args = parser.parse_args()
# Parse ports
ports = None
if args.ports:
if '-' in args.ports:
start, end = map(int, args.ports.split('-'))
ports = list(range(start, end + 1))
else:
ports = [int(p) for p in args.ports.split(',')]
elif args.full:
ports = list(range(1, 65536))
elif args.quick:
ports = [21, 22, 80, 443, 3306, 3389, 8080, 8443]
# Initialize scanner
scanner = ReconScanner(args.target, args.output)
# Run scan
try:
scanner.run_full_scan(ports=ports, subdomain_wordlist=args.subdomains)
except KeyboardInterrupt:
print(f"\n{Colors.WARNING}[!] Scan interrupted by user{Colors.ENDC}")
scanner.save_results()
sys.exit(0)
if __name__ == '__main__':
main()