import socket import struct import yaml from dataclasses import dataclass from datetime import datetime from utils.log import log # Reference docs # https://docs.microsoft.com/en-us/windows-server/troubleshoot/dynamic-host-configuration-protocol-basics FORMAT_STRING = ( "!" # Specifies network byte order "s" # OP: 1 byte "s" # HTYPE: 1 byte "s" # HLEN: 1 byte "s" # HOPS: 1 byte "4s" # XID: 4 bytes "2s" # SECS: 2 bytes "2s" # FLAGS: 2 bytes "4s" # CIADDR: 4 bytes "4s" # YIADDR: 4 bytes "4s" # SIADDR: 4 bytes "4s" # GIADDR: 4 bytes "6s" # CHADDR: 6 bytes "10s" # CHADDR: 10 bytes "192s" # OVERFLOW: 192 bytes "4s" # MAGICCOOKIE: 4 bytes ) # 240 bytes total OPTION_DESCRIPTIONS = { 1: 'Subnet Mask', 3: 'Default Gateway', 12: 'Hostname', 43: 'Vendor Specific Information', 50: 'Requested IP Address', 51: 'IP Address Lease Time', 53: 'DHCP Message Type', 54: 'DHCP Server IP Address', 55: 'Parameter Request List', 57: 'DHCP Maximum Message Size', 60: 'Class Identifier', 61: 'Client Identifier', 67: 'Boot File Name', 150: 'TFTP Server IP Address', } @dataclass() class DHCPPacket: # Based on DHCP Discovery packet header from: https://en.wikipedia.org/wiki/Dynamic_Host_Configuration_Protocol # Packet header field byte sizing: https://support.huawei.com/enterprise/en/doc/EDOC1100058931/25cd2dfc/dhcp-messages # DHCP packet types: https://www.omnisecu.com/tcpip/dhcp-dynamic-host-configuration-protocol-message-options.php OP: bytes HTYPE: bytes HLEN: bytes HOPS: bytes XID: bytes SECS: bytes FLAGS: bytes CIADDR: bytes # Client IP address YIADDR: bytes # DHCP Server IP address SIADDR: bytes # Server IP address GIADDR: bytes # Gateway IP address CHADDR: bytes # Client hardware address CHADDR_PADDING: bytes OVERFLOW: bytes # Overflow space for BOOTP legacy MCOOKIE: bytes OPTIONS: bytes END: bytes = bytes([0xFF]) class Packet(): @staticmethod def construct_tlv(_type, value): ''' Summary: Constructs a DHCP TLV - Type | Length | Value. Takes: _type: typically the DHCP option number value: value of the TLV, can be text or integer Returns: A bytes object TLV ''' if isinstance(value, str): value = str.encode(value) return bytes([_type, len(value)]) + value @staticmethod def construct_junos_suboptions(config_filename): ''' Summary: Constructs Junos specific suboptions required for Zero Touch Provisioning to work. 0: firmware file 1: configuration file 3: transfer mode Additional options are available, see: https://www.juniper.net/documentation/us/en/software/junos/junos-install-upgrade/topics/topic-map/zero-touch-provision.html#id-zero-touch-provisioning-using-dhcp-options Takes: config_filename: filename and path of the configuration file to be retrieved and applied to the client Returns: A bytes object consisting of multiple TLVs ''' return Packet.construct_tlv( _type=43, value=b''.join( [ #Packet.construct_tlv(0, '/path/to/junosimage.tgz'), # Image filename and path, for upgrading firmware Packet.construct_tlv(1, config_filename), # Configuration filename and path Packet.construct_tlv(3, 'tftp'), # Transfer mode ] ) ) @staticmethod def construct_reply_packet(reference_packet, _type, source_ip, topology): ''' Summary: Constructs a DHCP packet of type Offer or Ack, modelled from a received packet with some options changed/added. Ack packets are the exact same as Offer packets, just with the packet type TLV adjusted. Different options are changed/added based on client operating system extracted from the networkx Graph object. Takes: reference_packet: received DHCP packet to model the reply packet from _type: type of packet to construct, Offer or Ack source_ip: IP address that the DHCP packet was received from topology: networkx Graph object detailing the network topology Returns: DHCP packet of type Offer or Ack ''' packet_data = struct.unpack(FORMAT_STRING, reference_packet[:240]) reply_packet_object = DHCPPacket(*packet_data, OPTIONS=reference_packet[240:len(reference_packet)]) giaddr, yiaddr = Packet.process_giaddr(source_ip) client_device_name, client_device_os = topology.get_client_calling_for_ip(source_ip, _type) reply_packet_object.OP = bytes([0x02]) reply_packet_object.YIADDR = bytes([yiaddr[0], yiaddr[1], yiaddr[2], yiaddr[3]]) # Client IP reply_packet_object.SIADDR = bytes([192, 168, 1, 7]) # Server IP: 192.168.1.7 if _type == 'offer': packet_type = bytes([53, 1, 2]) # DHCP offer packet elif _type == 'ack': packet_type = bytes([53, 1, 5]) # DHCP ack packet if client_device_os == 'junos': reply_packet_object.OPTIONS = b"".join( [ packet_type, bytes([54, 4, 192, 168, 1, 7]), # Server identifier: 192.168.1.7 bytes([51, 4, 0x00, 0x01, 0x51, 0x80]), # Lease time: 86400 bytes([1, 4, 255, 255, 255, 254]), # Subnet mask: 255.255.255.254 bytes([3, 4, giaddr[0], giaddr[1], giaddr[2], giaddr[3]]), # Default gateway bytes([150, 4, 192, 168, 1, 7]), # TFTP server: 192.168.1.7 Packet.construct_junos_suboptions(f'/configs/{client_device_name}.conf'), # Juniper specific suboptions ] ) elif client_device_os == 'cisco_ios': reply_packet_object.OPTIONS = b"".join( [ packet_type, bytes([54, 4, 192, 168, 1, 7]), # Server identifier: 192.168.1.7 bytes([51, 4, 0x00, 0x01, 0x51, 0x80]), # Lease time: 86400 bytes([1, 4, 255, 255, 255, 254]), # Subnet mask: 255.255.255.254 bytes([3, 4, giaddr[0], giaddr[1], giaddr[2], giaddr[3]]), # Default gateway bytes([150, 4, 192, 168, 1, 7]), # TFTP server: 192.168.1.7 Packet.construct_tlv(67, f'/configs/{client_device_name}.conf'), # Cisco specific bootfile ] ) reply_packet_list = [] for field, byte_value in reply_packet_object.__dict__.items(): reply_packet_list.append(byte_value) reply_packet = b''.join(value for value in reply_packet_list) return reply_packet @staticmethod def examine_packet(raw_packet): ''' Summary: Examines a DHCP packet and splits it into packet data and DHCP options. It's expected that DHCP packet length after 240 will be DHCP options. Takes: raw_packet: raw DHCP packet to examine Returns: packet: DHCP packet data minus options options: DHCP packet options options_descriptions: DHCP packet options descriptions ''' packet_data = struct.unpack(FORMAT_STRING, raw_packet[:240]) packet = DHCPPacket(*packet_data, OPTIONS=raw_packet[240:len(raw_packet)]) options, options_descriptions = Packet.extract_options(packet.OPTIONS) return packet, options, options_descriptions @staticmethod def extract_options(packet_data): ''' Summary: Extracts DHCP options from a given raw DHCP packet. Takes: packet_data: DHCP packet as raw bytes Returns: option_dict: dictionairy of DHCP options extracted from a packet option_description_dict: dictionairy of DHCP option descriptions that were found in a packet ''' option_dict = {} option_description_dict = {} index = 0 while index < len(packet_data): option_number = packet_data[index] if option_number == 255: break option_length = packet_data[index + 1] option_value, option_description = Packet.make_human_readable(option_number, packet_data[index + 2 : index + 2 + option_length]) option_dict[option_number] = option_value option_description_dict[option_number] = option_description index += 2 + option_length return option_dict, option_description_dict @staticmethod def make_human_readable(option_number, raw_value): ''' Summary: Takes a raw DHCP option and makes it human readable. DHCP options: https://www.iana.org/assignments/bootp-dhcp-parameters/bootp-dhcp-parameters.xhtml Takes: option_number: DHCP option number raw_value: raw bytes value received with the option number Returns: option_value: human readable value of the option number option_description: description of the option ''' if option_number in [1, 3, 50, 54, 150]: octets = list(raw_value) option_value = f'{octets[0]}.{octets[1]}.{octets[2]}.{octets[3]}' elif option_number in [50, 55]: option_value = list(raw_value) elif option_number == 53: option_value = list(raw_value)[0] elif option_number in [60, 12, 61, 67, 43]: option_value = raw_value.decode() elif option_number in [51, 57]: option_value = int.from_bytes(raw_value, byteorder='big') else: option_value = raw_value if option_number in OPTION_DESCRIPTIONS: option_description = OPTION_DESCRIPTIONS[option_number] else: option_description = 'Unknown' return option_value, option_description @staticmethod def process_giaddr(giaddr): ''' Summary: Calculates IP address to offer the client and turns string giaddr into a list of 4 integers. Takes: giaddr: gateway or relay IP that the DHCP packet was received from (string) Returns: giaddr: gateway or relay IP that the DHCP packet was received from (list of 4 integers) yiaddr: IP to offer the client (list of 4 integers) ''' octets = giaddr.split('.') last_octet = int(octets[3]) giaddr = [int(octets[0]), int(octets[1]), int(octets[2]), int(octets[3])] if last_octet % 2 == 0: yiaddr = [int(octets[0]), int(octets[1]), int(octets[2]), int(last_octet + 1)] else: yiaddr = [int(octets[0]), int(octets[1]), int(octets[2]), int(last_octet - 1)] return giaddr, yiaddr class DHCPServer(object): def __init__(self, SETTINGS_FILE): self.SETTINGS = yaml.load(open(SETTINGS_FILE), Loader=yaml.SafeLoader) self.DHCP_SERVER_IP = self.SETTINGS['DHCP']['SERVER_IP'] self.DHCP_SERVER_PORT = self.SETTINGS['DHCP']['SERVER_PORT'] self.MAX_BYTES = self.SETTINGS['DHCP']['MAX_BYTES'] self.TFTP_SERVER_IP = self.SETTINGS['TFTP']['SERVER_IP'] def create_socket(self): ''' Summary: Creates a socket to allow the DHCP server to send and receive data to/from. Socket characteristics: AF_INET: type of socket, address format (host, port) SOCK_DGRAM: socket protocol, UDP SOL_SOCKET (socket options): SO_REUSEADDR: socket address and port can be reused SO_BROADCAST: datagrams can be broadcast from this socket Socket bound to: SERVER_IP PORT ''' _socket=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) _socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) _socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST,1) _socket.bind((self.DHCP_SERVER_IP, self.DHCP_SERVER_PORT)) return _socket def run(self, topology): ''' Summary: Runs the DHCP server allowing packets to be sent and received through the created socket. Takes: topology: networkx Graph object detailing the network topology ''' log('DHCP server is starting...', 'info') log(f'DHCP server IP: {self.DHCP_SERVER_IP} running on port {self.DHCP_SERVER_PORT}', 'info') log(f'TFTP server IP: {self.TFTP_SERVER_IP}', 'info') socket = self.create_socket() while True: try: log('Waiting for DHCP packets...', 'info') received_packet, (source_address, source_port) = socket.recvfrom(self.MAX_BYTES) log(f'DHCP packet received from IP {source_address} on port {source_port}', 'info') packet_data, packet_options, option_descriptions = Packet.examine_packet(received_packet) log(f'Received packet options:', 'info') for option_number, option_value in packet_options.items(): log(f'{option_number} [{option_descriptions[option_number]}]: {option_value}', 'info') if packet_options[53] == 1: log('Received DHCP discover packet', 'info') offer_packet = Packet.construct_reply_packet(received_packet, 'offer', source_address, topology) log('Constructing DHCP offer packet...', 'info') packet_data, packet_options, option_descriptions = Packet.examine_packet(offer_packet) log(f'Offer packet options:', 'info') for option_number, option_value in packet_options.items(): log(f'{option_number} [{option_descriptions[option_number]}]: {option_value}', 'info') socket.sendto(offer_packet, (source_address, self.DHCP_SERVER_PORT)) elif packet_options[53] == 3: log('Received DHCP request packet', 'info') ack_packet = Packet.construct_reply_packet(offer_packet, 'ack', source_address, topology) log('Constructing DHCP ack packet...', 'info') packet_data, packet_options, option_descriptions = Packet.examine_packet(ack_packet) log(f'Ack packet options:', 'info') for option_number, option_value in packet_options.items(): log(f'{option_number} [{option_descriptions[option_number]}]: {option_value}', 'info') socket.sendto(ack_packet, (source_address, self.DHCP_SERVER_PORT)) elif packet_options[53] == 7: log('Received DHCP release packet', 'info') log('See you around, partner\U0001F920', 'info') except KeyboardInterrupt: log('Exiting...', 'info') break