Breaking functions into separate files, moving Discord notifications to embeds, adding shipment IDs & destinations to notifications

This commit is contained in:
Dom 2024-07-15 07:57:02 +01:00
parent b9fdb4825f
commit fe25701659
6 changed files with 210 additions and 134 deletions

View File

@ -1,144 +1,45 @@
import requests
import json
import yaml
import logging
from datetime import datetime, timedelta
from log import log
from timeOperations import isInboundShipmentPlanWithinSpecifiedDelta
from amazonAPI import getAccessToken, getInboundShipmentData, getInboundShipmentPlans, getInboundShipmentPlan, getProductName
from sentNotifications import isInboundShipmentPlanIDInSentNotifications, updateSentNotifications
from discordNotifications import sendDiscordNotification
SETTINGS = yaml.safe_load(open('settings.yaml'))
def parseInboundShipmentPlans():
log('\U0001F504 Getting inbound shipment plans...', 'info')
inboundShipmentPlans = getInboundShipmentPlans()
inboundShipmentPlanIDs = []
inboundShipmentPlanData = {}
def log(log_message, level):
logger = logging.getLogger('sn-logger')
for plan in inboundShipmentPlans:
if isInboundShipmentPlanWithinSpecifiedDelta(plan['createdAt']):
log('Adding inbound shipment plan to list: {}'.format(plan['inboundPlanId']), 'info')
inboundShipmentPlanIDs.append(plan['inboundPlanId'])
inboundShipmentPlanData.update({plan['inboundPlanId']: {'creationDate': plan['createdAt']}})
log_message_types = {
'debug': logger.debug,
'info': logger.info,
'warning': logger.warning,
'error': logger.error,
'critical': logger.critical
}
if inboundShipmentPlanIDs:
log('\U0001F440 Checking inbound shipment plans...', 'info')
log(f'Shipments to check: {len(inboundShipmentPlanIDs)}', 'info')
if not logger.handlers:
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('[%(levelname)s] %(message)s')
file_handler = logging.FileHandler('logs/' + 'log_' + datetime.now().strftime("%Y-%m-%d_%H:%M:%S") + '.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
log_message_types[level](log_message)
def updateSentNotifications(inboundPlanId):
with open('SentNotifications.json') as NotificationsSentJson:
NotificationsSent = json.load(NotificationsSentJson)
NotificationsSent['NotificationsSent'].append(inboundPlanId)
with open('SentNotifications.json', mode='w') as outputNotificationsSent:
outputNotificationsSent.write(json.dumps(NotificationsSent, indent=4))
def isIDInSentNotifications(inboundPlanId):
with open('SentNotifications.json') as NotificationsSentJson:
NotificationsSent = json.load(NotificationsSentJson)
if inboundPlanId in NotificationsSent['NotificationsSent']:
return True
else:
return False
def isShipmentWithinSpecifiedDelta(shipmentCreationTime, delta=360):
currentTime = datetime.now()
shipmentTime = datetime.strptime(shipmentCreationTime, '%Y-%m-%dT%H:%M:%SZ')
timeDelta = currentTime - shipmentTime
log(f'Current time: {currentTime}', 'info')
log(f'Shipment creation time: {shipmentTime}', 'info')
log(f'Time delta: {timeDelta}', 'info')
if timeDelta < timedelta(minutes=delta):
return True
else:
return False
def getAccessToken(settings=SETTINGS):
AccessToken = requests.post(
'https://api.amazon.com/auth/o2/token',
{
'grant_type': 'refresh_token',
'refresh_token': settings['REFRESH_TOKEN'],
'client_id': settings['CLIENT_ID'],
'client_secret': settings['CLIENT_SECRET'],
}
)
return AccessToken.json()['access_token']
def getProductName(SKU, settings=SETTINGS):
product = requests.get(
settings['SPAPI_ENDPOINT'] + '/listings/2021-08-01/items/' + settings['SELLER_ID'] + f'/{SKU}',
headers = {
'x-amz-access-token': getAccessToken(),
},
params = {
'marketplaceIds': settings['MARKETPLACE_ID']
},
)
return product.json()['summaries'][0]['itemName']
def sendDiscordNotification(settings=SETTINGS, content=None):
notification = {"content": content}
requests.post(settings['DISCORD_WEBHOOK'], json=notification)
def getInboundShipments(settings=SETTINGS):
InboundShipments = requests.get(
settings['SPAPI_ENDPOINT'] + '/inbound/fba/2024-03-20/inboundPlans?pageSize=10&sortBy=CREATION_TIME&sortOrder=DESC&status=SHIPPED',
headers = {
'x-amz-access-token': getAccessToken(),
}
)
return InboundShipments.json()['inboundPlans']
def parseInboundShipments(settings=SETTINGS):
log('\U0001F504 Getting shipments...', 'info')
InboundShipments = getInboundShipments()
inboundPlanIDs = []
shipmentData = {}
for shipment in InboundShipments:
if isShipmentWithinSpecifiedDelta(shipment['createdAt']):
log('Adding inbound plan to list: {}'.format(shipment['inboundPlanId']), 'info')
inboundPlanIDs.append(shipment['inboundPlanId'])
if inboundPlanIDs:
log('\U0001F440 Checking shipments...', 'info')
log(f'Shipments to check: {len(inboundPlanIDs)}', 'info')
for ID in inboundPlanIDs:
if isIDInSentNotifications(ID):
log(f'Ignoring {ID}, notification has already been sent', 'info')
elif not isIDInSentNotifications(ID):
getShipment = requests.get(
settings['SPAPI_ENDPOINT'] + f'/inbound/fba/2024-03-20/inboundPlans/{ID}/items',
headers = {
'x-amz-access-token': getAccessToken(),
}
)
if getShipment.json()['items']:
for inboundShipmentPlanID in inboundShipmentPlanIDs:
if isInboundShipmentPlanIDInSentNotifications(inboundShipmentPlanID):
log(f'Ignoring inbound shipment plan {inboundShipmentPlanID}, notification has already been sent', 'info')
elif not isInboundShipmentPlanIDInSentNotifications(inboundShipmentPlanID):
inboundShipmentPlan = getInboundShipmentPlan(inboundShipmentPlanID)
if inboundShipmentPlan['items']:
itemDict = {}
totalItemCount = 0
for item in getShipment.json()['items']:
for item in inboundShipmentPlan['items']:
productName = getProductName(item['msku'])
itemDict.update({productName: item.get('quantity')})
totalItemCount += item['quantity']
itemDict.update({'Total item count': totalItemCount})
shipmentData.update({ID: itemDict})
log(f'\U0001F514 Sending Discord notification for {ID}...', 'info')
newline = '\n'
sendDiscordNotification(content=f':package: New shipment detected :package:\nShipment contents:\n{newline.join(f"- {MSKU}: {Count}" for MSKU, Count in shipmentData[ID].items())}')
updateSentNotifications(ID)
itemDict.update({'Total unit count': totalItemCount})
inboundShipmentPlanData[inboundShipmentPlanID]['contents'] = itemDict
inboundShipmentData = getInboundShipmentData(inboundShipmentPlanID)
inboundShipmentPlanData[inboundShipmentPlanID]['destinations'] = inboundShipmentData[inboundShipmentPlanID]['destinations']
inboundShipmentPlanData[inboundShipmentPlanID]['shipmentIDs'] = inboundShipmentData[inboundShipmentPlanID]['shipmentIDs']
log(f'\U0001F514 Sending Discord notification for {inboundShipmentPlanID}...', 'info')
sendDiscordNotification(inboundShipmentPlanID, inboundShipmentPlanData[inboundShipmentPlanID])
updateSentNotifications(inboundShipmentPlanID)
parseInboundShipments()
if __name__ == '__main__':
parseInboundShipmentPlans()

78
amazonAPI.py Normal file
View File

@ -0,0 +1,78 @@
import requests
import json
import yaml
from log import log
from discordNotifications import sendDiscordNotification
from sentNotifications import isInboundShipmentPlanIDInSentNotifications, updateSentNotifications
from timeOperations import isInboundShipmentPlanWithinSpecifiedDelta
SETTINGS = yaml.safe_load(open('settings.yaml'))
def getAccessToken(settings=SETTINGS):
accessToken = requests.post(
'https://api.amazon.com/auth/o2/token',
{
'grant_type': 'refresh_token',
'refresh_token': settings['REFRESH_TOKEN'],
'client_id': settings['CLIENT_ID'],
'client_secret': settings['CLIENT_SECRET'],
}
)
return accessToken.json()['access_token']
def getProductName(MSKU, settings=SETTINGS):
product = requests.get(
settings['SPAPI_ENDPOINT'] + '/listings/2021-08-01/items/' + settings['SELLER_ID'] + f'/{MSKU}',
headers = {
'x-amz-access-token': getAccessToken(),
},
params = {
'marketplaceIds': settings['MARKETPLACE_ID']
},
)
return product.json()['summaries'][0]['itemName']
def getInboundShipmentPlans(settings=SETTINGS):
inboundShipmentPlans = requests.get(
settings['SPAPI_ENDPOINT'] + '/inbound/fba/2024-03-20/inboundPlans?pageSize=10&sortBy=CREATION_TIME&sortOrder=DESC&status=SHIPPED',
headers = {
'x-amz-access-token': getAccessToken(),
}
)
return inboundShipmentPlans.json()['inboundPlans']
def getInboundShipmentPlan(inboundShipmentPlanID, settings=SETTINGS):
inboundShipmentPlan = requests.get(
settings['SPAPI_ENDPOINT'] + f'/inbound/fba/2024-03-20/inboundPlans/{inboundShipmentPlanID}/items',
headers = {
'x-amz-access-token': getAccessToken(),
}
)
return inboundShipmentPlan.json()
def getInboundShipmentData(inboundPlanId, settings=SETTINGS):
inboundShipmentData = {inboundPlanId: {'destinations': [], 'shipmentIDs': []}}
inboundPlanShipmentData = requests.get(
settings['SPAPI_ENDPOINT'] + f'/inbound/fba/2024-03-20/inboundPlans/{inboundPlanId}/placementOptions',
headers = {
'x-amz-access-token': getAccessToken(),
}
)
for shipmentID in inboundPlanShipmentData.json()['placementOptions'][0]['shipmentIds']:
individualShipmentData = requests.get(
settings['SPAPI_ENDPOINT'] + f'/inbound/fba/2024-03-20/inboundPlans/{inboundPlanId}/shipments/{shipmentID}',
headers = {
'x-amz-access-token': getAccessToken(),
}
)
inboundShipmentData[inboundPlanId]['shipmentIDs'].append(individualShipmentData.json()['shipmentConfirmationId'])
inboundShipmentData[inboundPlanId]['destinations'].append(individualShipmentData.json()['destination']['warehouseId'])
return inboundShipmentData

36
discordNotifications.py Normal file
View File

@ -0,0 +1,36 @@
import yaml
import requests
SETTINGS = yaml.safe_load(open('settings.yaml'))
def sendDiscordNotification(*args, settings=SETTINGS):
newLine = '\n'
shipmentNotification = requests.post(
SETTINGS['DISCORD_WEBHOOK'],
json = {
"embeds": [
{
"title": ":package: New Inbound Shipment Detected :package:",
"url": f"https://sellercentral.amazon.co.uk/fba/sendtoamazon?wf={args[0]}",
"color": 4886754,
"fields": [
{
"name": "Shipment Information",
"value": f'''
> Inbound Shipment Plan ID: {args[0]}
> Creation Date: {args[1]['creationDate']}
> Destination Fulfilment Centres: {', '.join(destination for destination in args[1]['destinations'])}
> Shipments: {', '.join(f'[{shipmentID}](https://sellercentral.amazon.co.uk/fba/inbound-shipment/summary/{shipmentID}/contents)' for shipmentID in args[1]['shipmentIDs'])}
'''
},
{
"name": "Shipment Contents",
"value": f'''
{newLine.join(f"> {MSKU}: {Count}" for MSKU, Count in args[1]['contents'].items())}
'''
}
]
}
]
},
)

26
log.py Normal file
View File

@ -0,0 +1,26 @@
import logging
from datetime import datetime
def log(log_message, level):
logger = logging.getLogger('sn-logger')
log_message_types = {
'debug': logger.debug,
'info': logger.info,
'warning': logger.warning,
'error': logger.error,
'critical': logger.critical
}
if not logger.handlers:
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('[%(levelname)s] %(message)s')
file_handler = logging.FileHandler('logs/' + 'log_' + datetime.now().strftime("%Y-%m-%d_%H:%M:%S") + '.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
log_message_types[level](log_message)

19
sentNotifications.py Normal file
View File

@ -0,0 +1,19 @@
import json
def updateSentNotifications(inboundPlanId):
with open('SentNotifications.json') as sentNotificationsJSON:
sentNotifications = json.load(sentNotificationsJSON)
sentNotifications['sentNotifications'].append(inboundPlanId)
with open('SentNotifications.json', mode='w') as outputSentNotifications:
outputSentNotifications.write(json.dumps(sentNotifications, indent=4))
def isInboundShipmentPlanIDInSentNotifications(inboundShipmentPlanId):
with open('SentNotifications.json') as sentNotificationsJSON:
sentNotifications = json.load(sentNotificationsJSON)
if inboundShipmentPlanId in sentNotifications['sentNotifications']:
return True
else:
return False

16
timeOperations.py Normal file
View File

@ -0,0 +1,16 @@
from datetime import datetime, timedelta
from log import log
def isInboundShipmentPlanWithinSpecifiedDelta(inboundShipmentPlanCreationTime, delta=360):
currentTime = datetime.now()
inboundShipmentPlanTime = datetime.strptime(inboundShipmentPlanCreationTime, '%Y-%m-%dT%H:%M:%SZ')
timeDelta = currentTime - inboundShipmentPlanTime
log(f'Current time: {currentTime}', 'info')
log(f'Inbound shipment plan creation time: {inboundShipmentPlanTime}', 'info')
log(f'Time delta: {timeDelta}', 'info')
if timeDelta < timedelta(minutes=delta):
return True
else:
return False