import sys, os
import json
import requests
import asyncio
import httpx
from .errors import *
class EnlayClient:
def __init__(self, key):
self.key = key
Send request function
def send_request(self, endpoint: str, post: bool = False, jsondata: bool = True, retasjson: bool = True, hasresponse: bool = True, data: dict = {}):
Args:
endpoint: str - The API endpoint
post: bool (False) - Specifies get/post request
jsondata: bool (True) - Data is posted as json
retasjson: bool (True) - Return response as json or text
hasresponse: bool (True) - Specifies if the request should recieve a response
data: dict ({}) - Data to send
headers = {
'Authorization': self.key
}
if post:
if data != {}:
if jsondata:
x = requests.post('https://api.enlay.io/'+endpoint, json=data, headers=headers)
else:
x = requests.post('https://api.enlay.io/'+endpoint, data=data, headers=headers)
else:
x = requests.post('https://api.enlay.io/'+endpoint, headers=headers)
if hasresponse:
if type(x.json()) != list:
if 'code' in x.json().keys():
print(x.json()['code'])
if x.json()['code'] == 'not authorized':
raise NotAuthorized(x.json()['message'])
elif x.json()['code'] == 'not found':
raise NotFound(x.json()['message'])
else:
raise OtherError(x.json()['message'])
if retasjson:
return x.json()
else:
return x.text
else:
return True
else:
x = requests.get('https://api.enlay.io/'+endpoint, headers=headers)
if hasresponse:
if type(x.json()) != list:
if 'code' in x.json().keys():
print(x.json()['code'])
if x.json()['code'] == 'not authorized':
raise NotAuthorized(x.json()['message'])
elif x.json()['code'] == 'not found':
raise NotFound(x.json()['message'])
else:
raise OtherError(x.json()['message'])
if retasjson:
return x.json()
else:
return x.text
else:
return True
Get a list of advertisers
def get_advertisers(self, limit: int = 20):
Args:
limit: int (20) - Limit of advertisers to return
return self.send_request(f'advertisers?limit={limit}')
Get a list of placements for a slot
def create_placements(self, id: str, unique: bool = True, max: int = 3):
Args:
id: str - Slot ID
unique: bool (True) - Specifies if placements are unique
max: int (3) - Max placements
return self.send_request(f'slots/{id}/placements', post=True, data={"max": max, "unique": unique})
Add a click to a placement
def click_placement(self, id: str, redirect_url: str = None):
Args:
id: str - Placement ID
redirect_url: str (None) - Link redirect url
s = f'p/{id}/c'
if redirect_url:
s += f'?redirect_url={redirect_url}'
return self.send_request(s, hasresponse=False)
Add an impression to a placement
def view_placement(self, placements: list):
Args:
placements: list - List of placement IDs ([{'id': '6987923797392924672'}, ...])
return self.send_request('p/v', post=True, data=placements, hasresponse=False)
class EnlayClientAsync:
Send request function
def __init__(self, key):
self.key = key
async def send_request(self, endpoint: str, post: bool = False, jsondata: bool = True, retasjson: bool = True, hasresponse: bool = True, data: dict = {}):
Args:
endpoint: str - The API endpoint
post: bool (False) - Specifies get/post request
jsondata: bool (True) - Data is posted as json
retasjson: bool (True) - Return response as json or text
hasresponse: bool (True) - Specifies if the request should recieve a response
data: dict ({}) - Data to send
Get a list of advertisers
headers = {
'Authorization': self.key
}
client = httpx.AsyncClient()
if post:
if data != {}:
if jsondata:
x = await client.post('https://api.enlay.io/'+endpoint, json=data, headers=headers)
else:
x = await client.post('https://api.enlay.io/'+endpoint, data=data, headers=headers)
else:
x = await client.post('https://api.enlay.io/'+endpoint, headers=headers)
x.raise_for_status()
if hasresponse:
if type(x.json()) != list:
if 'code' in x.json().keys():
print(x.json()['code'])
await client.aclose()
if x.json()['code'] == 'not authorized':
raise NotAuthorized(x.json()['message'])
elif x.json()['code'] == 'not found':
raise NotFound(x.json()['message'])
else:
raise OtherError(x.json()['message'])
if retasjson:
await client.aclose()
return x.json()
else:
await client.aclose()
return x.text
else:
await client.aclose()
return True
else:
x = await client.get('https://api.enlay.io/'+endpoint, headers=headers)
x.raise_for_status()
if hasresponse:
if type(x.json()) != list:
if 'code' in x.json().keys():
await client.aclose()
print(x.json()['code'])
if x.json()['code'] == 'not authorized':
raise NotAuthorized(x.json()['message'])
elif x.json()['code'] == 'not found':
raise NotFound(x.json()['message'])
else:
raise OtherError(x.json()['message'])
if retasjson:
await client.aclose()
return x.json()
else:
await client.aclose()
return x.text
else:
await client.aclose()
return True
async def get_advertisers(self, limit: int = 20):
return await self.send_request(f'advertisers?limit={limit}')
async def create_placements(self, id: str, unique: bool = True, max: int = 3):
Args:
id: str - Slot ID
unique: bool (True) - Specifies if placements are unique
max: int (3) - Max placements
Add a click to a placement
return await self.send_request(f'slots/{id}/placements', post=True, jsondata=False, data={"max": max, "unique": unique})
async def click_placement(self, id: str, redirect_url: str = None):
Args:
id: str - Placement ID
redirect_url: str (None) - Link redirect url
Add an impression to a placement
s = f'p/{id}/c'
if redirect_url:
s += f'?redirect_url={redirect_url}'
return await self.send_request(s, hasresponse=False)
async def view_placement(self, placements: list):
Args:
placements: list - List of placement IDs ([{'id': '6987923797392924672'}, ...])
return await self.send_request('p/v', post=True, data=placements, hasresponse=False)