130 lines
4.5 KiB
Python
Executable File
130 lines
4.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import ipaddress
|
|
import json
|
|
import os
|
|
|
|
from flask import Flask, request
|
|
|
|
class IPAMLite(object):
|
|
def __init__(self, database_file='ipam.json'):
|
|
self.database_file = database_file
|
|
self.leased = []
|
|
|
|
def database(self):
|
|
return(json.dumps(self.leased, indent=4))
|
|
|
|
def sequential_lease(self, network, subnet):
|
|
network_exists = False
|
|
host_addresses = [str(ip) for ip in ipaddress.IPv4Network(f'{network}/{subnet}').hosts()]
|
|
|
|
for network_dict in self.leased:
|
|
if f'{network}/{subnet}' in network_dict.keys():
|
|
for network_address, leased_addresses in network_dict.items():
|
|
leaseable_addresses = [ip for ip in host_addresses if ip not in leased_addresses]
|
|
if len(leaseable_addresses) >= 1:
|
|
next_available_lease = leaseable_addresses[0]
|
|
leased_addresses.append(next_available_lease)
|
|
network_exists = True
|
|
break
|
|
else:
|
|
return f'\U0001F916 No address space left in: {network}/{subnet}\n'
|
|
|
|
if not network_exists:
|
|
next_available_lease = host_addresses[0]
|
|
self.leased.append({str(f'{network}/{subnet}'): [next_available_lease]})
|
|
|
|
self.save_database()
|
|
return f'\U0001F916 Lease added: {next_available_lease}/{subnet}\n'
|
|
|
|
def specific_lease(self, ip, subnet):
|
|
network_exists = False
|
|
network_address = ipaddress.ip_interface(f'{ip}/{subnet}').network
|
|
|
|
for network_dict in self.leased:
|
|
for network, leased_addresses in network_dict.items():
|
|
if ip in leased_addresses:
|
|
return f'\U0001F916 Lease already exists: {ip}/{subnet}\n'
|
|
if str(network) == str(network_address):
|
|
leased_addresses.append(ip)
|
|
network_exists = True
|
|
|
|
if not network_exists:
|
|
self.leased.append({str(network_address): [ip]})
|
|
|
|
self.save_database()
|
|
return f'\U0001F916 Lease added: {ip}/{subnet}\n'
|
|
|
|
def release(self, ip, subnet):
|
|
for index in range(len(self.leased)):
|
|
for network, leased_addresses in self.leased[index].items():
|
|
if ip in leased_addresses and len(leased_addresses) < 2:
|
|
del self.leased[index]
|
|
self.save_database()
|
|
return f'\U0001F916 Lease removed: {ip}/{subnet}\n'
|
|
break
|
|
elif ip in leased_addresses and len(leased_addresses) > 1:
|
|
leased_addresses.remove(ip)
|
|
self.save_database()
|
|
return f'\U0001F916 Lease removed: {ip}/{subnet}\n'
|
|
break
|
|
|
|
return f'\U0001F916 Nothing to release for: {ip}/{subnet}\n'
|
|
|
|
def save_database(self):
|
|
with open(self.database_file, 'w') as ipam_file:
|
|
json.dump(self.leased, ipam_file, indent=4)
|
|
|
|
def load_database(self):
|
|
if os.path.exists(self.database_file):
|
|
with open(self.database_file, 'r') as ipam_file:
|
|
db = json.load(ipam_file)
|
|
|
|
for lease in db:
|
|
self.leased.append(lease)
|
|
|
|
print(f'\U0001F916 Database file "{self.database_file}" loaded successfully')
|
|
else:
|
|
print(f'\U0001F916 Cannot load database file "{self.database_file}"')
|
|
|
|
IPAM = IPAMLite()
|
|
IPAM.load_database()
|
|
|
|
app = Flask(__name__)
|
|
|
|
@app.route('/database', methods=['GET'])
|
|
def get_database():
|
|
return IPAM.database()
|
|
|
|
@app.route('/sequential_lease', methods=['POST'])
|
|
def handle_sequential_lease():
|
|
if request.is_json:
|
|
data = request.json
|
|
lease_response = IPAM.sequential_lease(data['network'], data['subnet'])
|
|
|
|
return f'{lease_response}'
|
|
else:
|
|
return 'Data type not supported, supported data types: JSON\n'
|
|
|
|
@app.route('/specific_lease', methods=['POST'])
|
|
def handle_specific_lease():
|
|
if request.is_json:
|
|
data = request.json
|
|
lease_response = IPAM.specific_lease(data['ip'], data['subnet'])
|
|
|
|
return f'{lease_response}'
|
|
else:
|
|
return 'Data type not supported, supported data types: JSON\n'
|
|
|
|
@app.route('/release', methods=['POST'])
|
|
def handle_release():
|
|
if request.is_json:
|
|
data = request.json
|
|
release_response = IPAM.release(data['ip'], data['subnet'])
|
|
|
|
return f'{release_response}'
|
|
else:
|
|
return 'Data type not supported, supported data types: JSON\n'
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug = True) |