382 lines
16 KiB
Python
382 lines
16 KiB
Python
import socket
|
|
import struct
|
|
import yaml
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from utils.log import log
|
|
|
|
# Reference docs
|
|
# https://docs.microsoft.com/en-us/windows-server/troubleshoot/dynamic-host-configuration-protocol-basics
|
|
|
|
FORMAT_STRING = (
|
|
"!" # Specifies network byte order
|
|
"s" # OP: 1 byte
|
|
"s" # HTYPE: 1 byte
|
|
"s" # HLEN: 1 byte
|
|
"s" # HOPS: 1 byte
|
|
"4s" # XID: 4 bytes
|
|
"2s" # SECS: 2 bytes
|
|
"2s" # FLAGS: 2 bytes
|
|
"4s" # CIADDR: 4 bytes
|
|
"4s" # YIADDR: 4 bytes
|
|
"4s" # SIADDR: 4 bytes
|
|
"4s" # GIADDR: 4 bytes
|
|
"6s" # CHADDR: 6 bytes
|
|
"10s" # CHADDR: 10 bytes
|
|
"192s" # OVERFLOW: 192 bytes
|
|
"4s" # MAGICCOOKIE: 4 bytes
|
|
) # 240 bytes total
|
|
|
|
OPTION_DESCRIPTIONS = {
|
|
1: 'Subnet Mask',
|
|
3: 'Default Gateway',
|
|
12: 'Hostname',
|
|
43: 'Vendor Specific Information',
|
|
50: 'Requested IP Address',
|
|
51: 'IP Address Lease Time',
|
|
53: 'DHCP Message Type',
|
|
54: 'DHCP Server IP Address',
|
|
55: 'Parameter Request List',
|
|
57: 'DHCP Maximum Message Size',
|
|
60: 'Class Identifier',
|
|
61: 'Client Identifier',
|
|
67: 'Boot File Name',
|
|
150: 'TFTP Server IP Address',
|
|
}
|
|
|
|
@dataclass()
|
|
class DHCPPacket:
|
|
# Based on DHCP Discovery packet header from: https://en.wikipedia.org/wiki/Dynamic_Host_Configuration_Protocol
|
|
# Packet header field byte sizing: https://support.huawei.com/enterprise/en/doc/EDOC1100058931/25cd2dfc/dhcp-messages
|
|
# DHCP packet types: https://www.omnisecu.com/tcpip/dhcp-dynamic-host-configuration-protocol-message-options.php
|
|
|
|
OP: bytes
|
|
HTYPE: bytes
|
|
HLEN: bytes
|
|
HOPS: bytes
|
|
XID: bytes
|
|
SECS: bytes
|
|
FLAGS: bytes
|
|
CIADDR: bytes # Client IP address
|
|
YIADDR: bytes # DHCP Server IP address
|
|
SIADDR: bytes # Server IP address
|
|
GIADDR: bytes # Gateway IP address
|
|
CHADDR: bytes # Client hardware address
|
|
CHADDR_PADDING: bytes
|
|
OVERFLOW: bytes # Overflow space for BOOTP legacy
|
|
MCOOKIE: bytes
|
|
OPTIONS: bytes
|
|
END: bytes = bytes([0xFF])
|
|
|
|
class Packet():
|
|
@staticmethod
|
|
def construct_tlv(_type, value):
|
|
'''
|
|
Summary:
|
|
Constructs a DHCP TLV - Type | Length | Value.
|
|
|
|
Takes:
|
|
_type: typically the DHCP option number
|
|
value: value of the TLV, can be text or integer
|
|
|
|
Returns:
|
|
A bytes object TLV
|
|
'''
|
|
if isinstance(value, str):
|
|
value = str.encode(value)
|
|
return bytes([_type, len(value)]) + value
|
|
|
|
@staticmethod
|
|
def construct_junos_suboptions(config_filename):
|
|
'''
|
|
Summary:
|
|
Constructs Junos specific suboptions required for Zero Touch Provisioning to work.
|
|
|
|
0: firmware file
|
|
1: configuration file
|
|
3: transfer mode
|
|
|
|
Additional options are available, see:
|
|
https://www.juniper.net/documentation/us/en/software/junos/junos-install-upgrade/topics/topic-map/zero-touch-provision.html#id-zero-touch-provisioning-using-dhcp-options
|
|
|
|
Takes:
|
|
config_filename: filename and path of the configuration file to be retrieved and applied to the client
|
|
|
|
Returns:
|
|
A bytes object consisting of multiple TLVs
|
|
'''
|
|
return Packet.construct_tlv(
|
|
_type=43,
|
|
value=b''.join(
|
|
[
|
|
#Packet.construct_tlv(0, '/path/to/junosimage.tgz'), # Image filename and path, for upgrading firmware
|
|
Packet.construct_tlv(1, config_filename), # Configuration filename and path
|
|
Packet.construct_tlv(3, 'tftp'), # Transfer mode
|
|
]
|
|
)
|
|
)
|
|
|
|
@staticmethod
|
|
def construct_reply_packet(reference_packet, _type, source_ip, topology, settings):
|
|
'''
|
|
Summary:
|
|
Constructs a DHCP packet of type Offer or Ack, modelled from a received packet with some options changed/added.
|
|
Ack packets are the exact same as Offer packets, just with the packet type TLV adjusted.
|
|
Different options are changed/added based on client operating system extracted from the networkx Graph object.
|
|
|
|
Takes:
|
|
reference_packet: received DHCP packet to model the reply packet from
|
|
_type: type of packet to construct, Offer or Ack
|
|
source_ip: IP address that the DHCP packet was received from
|
|
topology: networkx Graph object detailing the network topology
|
|
settings: settings file containing DHCP and TFTP server IPs, required for the packet contents
|
|
|
|
Returns:
|
|
DHCP packet of type Offer or Ack
|
|
'''
|
|
packet_data = struct.unpack(FORMAT_STRING, reference_packet[:240])
|
|
reply_packet_object = DHCPPacket(*packet_data, OPTIONS=reference_packet[240:len(reference_packet)])
|
|
|
|
giaddr, yiaddr = Packet.process_giaddr(source_ip)
|
|
client_device_name, client_device_os = topology.get_client_calling_for_ip(source_ip, _type)
|
|
|
|
reply_packet_object.OP = bytes([0x02])
|
|
reply_packet_object.YIADDR = bytes([yiaddr[0], yiaddr[1], yiaddr[2], yiaddr[3]]) # Client IP
|
|
reply_packet_object.SIADDR = bytes([int(octet) for octet in settings['DHCP']['SERVER_IP'].split('.')]) # Server IP (192.168.0.247)
|
|
|
|
if client_device_os == 'junos':
|
|
reply_packet_object.OPTIONS = b"".join(
|
|
[
|
|
bytes([53, 1, 2 if _type == 'offer' else 5]), # DHCP offer or ack packet
|
|
bytes([54, 4] + [int(octet) for octet in settings['DHCP']['SERVER_IP'].split('.')]), # Server identifier (192.168.0.247)
|
|
bytes([51, 4, 0x00, 0x01, 0x51, 0x80]), # Lease time: 86400
|
|
bytes([1, 4, 255, 255, 255, 254]), # Subnet mask: 255.255.255.254
|
|
bytes([3, 4, giaddr[0], giaddr[1], giaddr[2], giaddr[3]]), # Default gateway
|
|
bytes([150, 4] + [int(octet) for octet in settings['TFTP']['SERVER_IP'].split('.')]), # TFTP server (192.168.0.247)
|
|
Packet.construct_junos_suboptions(f'/configs/{client_device_name}.conf'), # Juniper specific suboptions
|
|
]
|
|
)
|
|
elif client_device_os == 'cisco_ios':
|
|
reply_packet_object.OPTIONS = b"".join(
|
|
[
|
|
bytes([53, 1, 3 if _type == 'offer' else 5]), # DHCP offer or ack packet
|
|
bytes([54, 4] + [int(octet) for octet in settings['DHCP']['SERVER_IP'].split('.')]), # Server identifier (192.168.0.247)
|
|
bytes([51, 4, 0x00, 0x01, 0x51, 0x80]), # Lease time: 86400
|
|
bytes([1, 4, 255, 255, 255, 254]), # Subnet mask: 255.255.255.254
|
|
bytes([3, 4, giaddr[0], giaddr[1], giaddr[2], giaddr[3]]), # Default gateway
|
|
bytes([150, 4] + [int(octet) for octet in settings['TFTP']['SERVER_IP'].split('.')]), # TFTP server (192.168.0.247)
|
|
Packet.construct_tlv(67, f'/configs/{client_device_name}.conf'), # Cisco specific bootfile
|
|
]
|
|
)
|
|
|
|
reply_packet_list = []
|
|
|
|
for field, byte_value in reply_packet_object.__dict__.items():
|
|
reply_packet_list.append(byte_value)
|
|
|
|
reply_packet = b''.join(value for value in reply_packet_list)
|
|
|
|
return reply_packet
|
|
|
|
@staticmethod
|
|
def examine_packet(raw_packet):
|
|
'''
|
|
Summary:
|
|
Examines a DHCP packet and splits it into packet data and DHCP options.
|
|
It's expected that DHCP packet length after 240 will be DHCP options.
|
|
|
|
Takes:
|
|
raw_packet: raw DHCP packet to examine
|
|
|
|
Returns:
|
|
packet: DHCP packet data minus options
|
|
options: DHCP packet options
|
|
options_descriptions: DHCP packet options descriptions
|
|
'''
|
|
packet_data = struct.unpack(FORMAT_STRING, raw_packet[:240])
|
|
packet = DHCPPacket(*packet_data, OPTIONS=raw_packet[240:len(raw_packet)])
|
|
|
|
options, options_descriptions = Packet.extract_options(packet.OPTIONS)
|
|
|
|
return packet, options, options_descriptions
|
|
|
|
@staticmethod
|
|
def extract_options(packet_data):
|
|
'''
|
|
Summary:
|
|
Extracts DHCP options from a given raw DHCP packet.
|
|
|
|
Takes:
|
|
packet_data: DHCP packet as raw bytes
|
|
|
|
Returns:
|
|
option_dict: dictionairy of DHCP options extracted from a packet
|
|
option_description_dict: dictionairy of DHCP option descriptions that were found in a packet
|
|
'''
|
|
option_dict = {}
|
|
option_description_dict = {}
|
|
|
|
index = 0
|
|
while index < len(packet_data):
|
|
option_number = packet_data[index]
|
|
if option_number == 255:
|
|
break
|
|
option_length = packet_data[index + 1]
|
|
option_value, option_description = Packet.make_human_readable(option_number, packet_data[index + 2 : index + 2 + option_length])
|
|
|
|
option_dict[option_number] = option_value
|
|
option_description_dict[option_number] = option_description
|
|
index += 2 + option_length
|
|
|
|
return option_dict, option_description_dict
|
|
|
|
@staticmethod
|
|
def make_human_readable(option_number, raw_value):
|
|
'''
|
|
Summary:
|
|
Takes a raw DHCP option and makes it human readable.
|
|
|
|
DHCP options:
|
|
https://www.iana.org/assignments/bootp-dhcp-parameters/bootp-dhcp-parameters.xhtml
|
|
|
|
Takes:
|
|
option_number: DHCP option number
|
|
raw_value: raw bytes value received with the option number
|
|
|
|
Returns:
|
|
option_value: human readable value of the option number
|
|
option_description: description of the option
|
|
'''
|
|
if option_number in [1, 3, 50, 54, 150]:
|
|
octets = list(raw_value)
|
|
option_value = f'{octets[0]}.{octets[1]}.{octets[2]}.{octets[3]}'
|
|
elif option_number in [50, 55]:
|
|
option_value = list(raw_value)
|
|
elif option_number == 53:
|
|
option_value = list(raw_value)[0]
|
|
elif option_number in [60, 12, 61, 67, 43]:
|
|
option_value = raw_value.decode()
|
|
elif option_number in [51, 57]:
|
|
option_value = int.from_bytes(raw_value, byteorder='big')
|
|
else:
|
|
option_value = raw_value
|
|
|
|
if option_number in OPTION_DESCRIPTIONS:
|
|
option_description = OPTION_DESCRIPTIONS[option_number]
|
|
else:
|
|
option_description = 'Unknown'
|
|
|
|
return option_value, option_description
|
|
|
|
@staticmethod
|
|
def process_giaddr(giaddr):
|
|
'''
|
|
Summary:
|
|
Calculates IP address to offer the client and turns string giaddr into a list of 4 integers.
|
|
|
|
Takes:
|
|
giaddr: gateway or relay IP that the DHCP packet was received from (string)
|
|
|
|
Returns:
|
|
giaddr: gateway or relay IP that the DHCP packet was received from (list of 4 integers)
|
|
yiaddr: IP to offer the client (list of 4 integers)
|
|
'''
|
|
octets = giaddr.split('.')
|
|
last_octet = int(octets[3])
|
|
giaddr = [int(octets[0]), int(octets[1]), int(octets[2]), int(octets[3])]
|
|
|
|
if last_octet % 2 == 0:
|
|
yiaddr = [int(octets[0]), int(octets[1]), int(octets[2]), int(last_octet + 1)]
|
|
else:
|
|
yiaddr = [int(octets[0]), int(octets[1]), int(octets[2]), int(last_octet - 1)]
|
|
|
|
return giaddr, yiaddr
|
|
|
|
class DHCPServer(object):
|
|
def __init__(self, SETTINGS_FILE):
|
|
'''
|
|
Summary:
|
|
Constructs the DHCPServer object using the contents of the settings YAML file.
|
|
'''
|
|
self.SETTINGS = yaml.load(open(SETTINGS_FILE), Loader=yaml.SafeLoader)
|
|
self.DHCP_SERVER_IP = self.SETTINGS['DHCP']['SERVER_IP']
|
|
self.DHCP_SERVER_PORT = self.SETTINGS['DHCP']['SERVER_PORT']
|
|
self.MAX_BYTES = self.SETTINGS['DHCP']['MAX_BYTES']
|
|
self.TFTP_SERVER_IP = self.SETTINGS['TFTP']['SERVER_IP']
|
|
|
|
def create_socket(self):
|
|
'''
|
|
Summary:
|
|
Creates a socket to allow the DHCP server to send and receive data to/from.
|
|
|
|
Socket characteristics:
|
|
AF_INET: type of socket, address format (host, port)
|
|
SOCK_DGRAM: socket protocol, UDP
|
|
SOL_SOCKET (socket options):
|
|
SO_REUSEADDR: socket address and port can be reused
|
|
SO_BROADCAST: datagrams can be broadcast from this socket
|
|
|
|
Socket bound to:
|
|
SERVER_IP
|
|
PORT
|
|
'''
|
|
_socket=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
|
|
_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST,1)
|
|
_socket.bind((self.DHCP_SERVER_IP, self.DHCP_SERVER_PORT))
|
|
|
|
return _socket
|
|
|
|
def run(self, topology):
|
|
'''
|
|
Summary:
|
|
Runs the DHCP server allowing packets to be sent and received through the created socket.
|
|
|
|
Takes:
|
|
topology: networkx Graph object detailing the network topology
|
|
'''
|
|
log('DHCP server is starting...', 'info')
|
|
log(f'DHCP server IP: {self.DHCP_SERVER_IP} running on port {self.DHCP_SERVER_PORT}', 'info')
|
|
log(f'TFTP server IP: {self.TFTP_SERVER_IP}', 'info')
|
|
|
|
socket = self.create_socket()
|
|
|
|
while True:
|
|
try:
|
|
log('Waiting for DHCP packets...', 'info')
|
|
received_packet, (source_address, source_port) = socket.recvfrom(self.MAX_BYTES)
|
|
log(f'DHCP packet received from IP {source_address} on port {source_port}', 'info')
|
|
packet_data, packet_options, option_descriptions = Packet.examine_packet(received_packet)
|
|
log(f'Received packet options:', 'info')
|
|
for option_number, option_value in packet_options.items():
|
|
log(f'{option_number} [{option_descriptions[option_number]}]: {option_value}', 'info')
|
|
|
|
if packet_options[53] == 1:
|
|
log('Received DHCP discover packet', 'info')
|
|
offer_packet = Packet.construct_reply_packet(received_packet, 'offer', source_address, topology, self.SETTINGS)
|
|
log('Constructing DHCP offer packet...', 'info')
|
|
packet_data, packet_options, option_descriptions = Packet.examine_packet(offer_packet)
|
|
log(f'Offer packet options:', 'info')
|
|
for option_number, option_value in packet_options.items():
|
|
log(f'{option_number} [{option_descriptions[option_number]}]: {option_value}', 'info')
|
|
socket.sendto(offer_packet, (source_address, self.DHCP_SERVER_PORT))
|
|
|
|
elif packet_options[53] == 3:
|
|
log('Received DHCP request packet', 'info')
|
|
ack_packet = Packet.construct_reply_packet(offer_packet, 'ack', source_address, topology, self.SETTINGS)
|
|
log('Constructing DHCP ack packet...', 'info')
|
|
packet_data, packet_options, option_descriptions = Packet.examine_packet(ack_packet)
|
|
log(f'Ack packet options:', 'info')
|
|
for option_number, option_value in packet_options.items():
|
|
log(f'{option_number} [{option_descriptions[option_number]}]: {option_value}', 'info')
|
|
socket.sendto(ack_packet, (source_address, self.DHCP_SERVER_PORT))
|
|
|
|
elif packet_options[53] == 7:
|
|
log('Received DHCP release packet', 'info')
|
|
log('See you around, partner\U0001F920', 'info')
|
|
|
|
except KeyboardInterrupt:
|
|
log('Exiting...', 'info')
|
|
break
|