131 lines
4.8 KiB
Python
131 lines
4.8 KiB
Python
import requests
|
|
import json
|
|
import yaml
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
SETTINGS = yaml.safe_load(open('settings.yaml'))
|
|
|
|
def log(log_message, level):
|
|
logger = logging.getLogger('sn-logger')
|
|
|
|
log_message_types = {
|
|
'debug': logger.debug,
|
|
'info': logger.info,
|
|
'warning': logger.warning,
|
|
'error': logger.error,
|
|
'critical': logger.critical
|
|
}
|
|
|
|
if not logger.handlers:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
formatter = logging.Formatter('[%(levelname)s] %(message)s')
|
|
|
|
file_handler = logging.FileHandler('logs/' + 'log_' + datetime.now().strftime("%Y-%m-%d_%H:%M:%S") + '.log')
|
|
file_handler.setLevel(logging.DEBUG)
|
|
file_handler.setFormatter(formatter)
|
|
|
|
logger.addHandler(file_handler)
|
|
|
|
log_message_types[level](log_message)
|
|
|
|
def updateSentNotifications(inboundPlanId):
|
|
with open('SentNotifications.json') as NotificationsSentJson:
|
|
NotificationsSent = json.load(NotificationsSentJson)
|
|
|
|
NotificationsSent['NotificationsSent'].append(inboundPlanId)
|
|
|
|
with open('SentNotifications.json', mode='w') as outputNotificationsSent:
|
|
outputNotificationsSent.write(json.dumps(NotificationsSent, indent=4))
|
|
|
|
def isIDInSentNotifications(inboundPlanId):
|
|
with open('SentNotifications.json') as NotificationsSentJson:
|
|
NotificationsSent = json.load(NotificationsSentJson)
|
|
|
|
if inboundPlanId in NotificationsSent['NotificationsSent']:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def isShipmentWithinSpecifiedDelta(shipmentCreationTime):
|
|
currentTime = datetime.now()
|
|
shipmentTime = datetime.strptime(shipmentCreationTime, '%Y-%m-%dT%H:%M:%SZ')
|
|
timeDelta = currentTime - shipmentTime
|
|
|
|
log(f'Current time: {currentTime}', 'info')
|
|
log(f'Shipment creation time: {shipmentTime}', 'info')
|
|
log(f'Time delta: {timeDelta}', 'info')
|
|
|
|
if timeDelta < timedelta(minutes=360):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def getAccessToken(settings=SETTINGS):
|
|
AccessToken = requests.post(
|
|
'https://api.amazon.com/auth/o2/token',
|
|
{
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': settings['REFRESH_TOKEN'],
|
|
'client_id': settings['CLIENT_ID'],
|
|
'client_secret': settings['CLIENT_SECRET'],
|
|
}
|
|
)
|
|
|
|
return AccessToken.json()['access_token']
|
|
|
|
def sendDiscordNotification(settings=SETTINGS, content=None):
|
|
notification = {"content": content}
|
|
requests.post(settings['DISCORD_WEBHOOK'], json=notification)
|
|
|
|
def getInboundShipments(settings=SETTINGS):
|
|
InboundShipments = requests.get(
|
|
settings['SPAPI_ENDPOINT'] + '/inbound/fba/2024-03-20/inboundPlans?pageSize=10&sortBy=CREATION_TIME&sortOrder=DESC&status=SHIPPED',
|
|
headers = {
|
|
'x-amz-access-token': getAccessToken(),
|
|
}
|
|
)
|
|
|
|
return InboundShipments.json()['inboundPlans']
|
|
|
|
def parseInboundShipments(settings=SETTINGS):
|
|
log('\U0001F504 Getting shipments...', 'info')
|
|
InboundShipments = getInboundShipments()
|
|
inboundPlanIDs = []
|
|
shipmentData = {}
|
|
|
|
for shipment in InboundShipments:
|
|
log('Got shipment creation date: {}'.format(shipment['createdAt']), 'info')
|
|
if isShipmentWithinSpecifiedDelta(shipment['createdAt']):
|
|
log('Adding inbound plan to list: {}'.format(shipment['inboundPlanId']), 'info')
|
|
inboundPlanIDs.append(shipment['inboundPlanId'])
|
|
|
|
if inboundPlanIDs:
|
|
log('\U0001F440 Checking shipments...', 'info')
|
|
log(f'Shipments to check: {len(inboundPlanIDs)}', 'info')
|
|
|
|
for ID in inboundPlanIDs:
|
|
if isIDInSentNotifications(ID):
|
|
log(f'Ignoring {ID}, notification has already been sent', 'info')
|
|
elif not isIDInSentNotifications(ID):
|
|
getShipment = requests.get(
|
|
settings['SPAPI_ENDPOINT'] + f'/inbound/fba/2024-03-20/inboundPlans/{ID}/items',
|
|
headers = {
|
|
'x-amz-access-token': getAccessToken(),
|
|
}
|
|
)
|
|
if getShipment.json()['items']:
|
|
itemDict = {}
|
|
totalItemCount = 0
|
|
for item in getShipment.json()['items']:
|
|
itemDict.update({item.get('msku'): item.get('quantity')})
|
|
totalItemCount += item['quantity']
|
|
itemDict.update({'Total item count': totalItemCount})
|
|
shipmentData.update({ID: itemDict})
|
|
log(f'\U0001F514 Sending Discord notification for {ID}...', 'info')
|
|
newline = '\n'
|
|
sendDiscordNotification(content=f':package: New shipment detected :package:\nShipment contents:\n{newline.join(f"- {MSKU}: {Count}" for MSKU, Count in shipmentData[ID].items())}')
|
|
updateSentNotifications(ID)
|
|
|
|
parseInboundShipments() |