Source code for netinfo
#!/usr/bin/env python
"""Abstract API over the Netinfo API."""
import json
import logging
import requests
__author__ = "Brandon Dixon"
__copyright__ = "Copyright, Netinfo"
__credits__ = ["Brandon Dixon"]
__license__ = "MIT"
__maintainer__ = "Brandon Dixon"
__email__ = "brandon@backscatter.io"
__status__ = "BETA"
class RequestFailure(Exception):
"""Exception to capture a failed request."""
pass
class InvalidResponse(Exception):
"""Exception to capture a failed response parse."""
pass
[docs]class Netinfo:
"""Abstract interface for Netinfo."""
NAME = "Netinfo"
def __init__(self, url):
"""Init the object."""
self.url = url
self.type_map = {
'ip': 'lookup',
'network': 'network-addresses',
'as': 'as',
'port': 'port'
}
[docs] def _request(self, endpoint, params=dict(), data=None):
"""Handle the requesting of information from the API."""
client_value = "Python Netinfo"
headers = {'X-Request-Client': client_value}
url = '/'.join([self.url, endpoint])
kwargs = {'url': url, 'headers': headers, 'timeout': 30,
'params': params, 'data': data}
response = requests.get(**kwargs)
if response.status_code not in range(200, 299):
raise RequestFailure(response.status_code, response.content)
try:
loaded = json.loads(response.content)
except Exception as error:
raise InvalidResponse(error)
return loaded
[docs] def enrich(self, query, query_type):
"""Enrich data based on the type."""
to_call = self.type_map.get(query_type, None)
if not to_call:
raise RequestFailure("Query type is not supported.")
if query_type == 'ip':
params = {'ip': query}
elif query_type == 'network':
params = {'cidr': query}
elif query_type == 'port':
params = {'port': query}
else:
params = {'asn': query}
return self._request(to_call, params=params)