Tuesday, December 14, 2021

Pagerduty - event API - trigger/resolve incident

class PagerDutyAlert(object):
def __init__(self):
super(PagerDutyAlert, self).__init__()

def _trigger_incident(self, alert):
self._alert = alert

self._source = self._alert['Source']
self._service = self._alert['Service']
self._workflow_name = self._alert['WorkflowName']
self._workflow_status = self._alert['WorkflowStatus']
self._workflow_level = self._alert['Level']
self._workflow_env = self._alert['Environment']
self._team = self._alert['Team']
self._summary = self._alert['Summary']
self._severity = self._alert['Severity']

"""Triggers an incident via the V2 REST API."""
self.routing_key = xxxxxxxx
self.FROM = xxxxe-mailxxxxx
headers = {
'Content-Type': 'application/json',
'Accept': 'application/vnd.pagerduty+json;version=2',
'From': self.FROM
}

url = 'https://events.pagerduty.com/v2/enqueue'
custom_details = {"Source": self._source,
"Service": self._service,
"WorkflowName": self._workflow_name,
"Summary": self._summary,
"Level": self._workflow_level,
"Environment": self._workflow_env,
"Team": self._team,
"Time": str(int(time.time()))
}
obj = {
'routing_key': self.routing_key,
"event_action": "trigger",
"payload": {
"summary": "Workflow: " + self._workflow_name + " Failed!",
"custom_details": custom_details,
"severity": self._severity,
"source": self._source
}
}
r = requests.post(url, headers=headers, data=json.dumps(obj))

print('Status Code: {code}'.format(code=r.status_code))
print(r.json())

def _resolve_incident(self, dedup_key):
header = {
"Content-Type": "application/json"
}

payload = {
"routing_key": self.routing_key,
"event_action": "resolve",
"dedup_key": dedup_key
}

response = requests.post('https://events.pagerduty.com/v2/enqueue',
data=json.dumps(payload),
headers=header)

if response.json()["status"] == "success":
print('Incident Resolved ')
else:
print(response.text) # print error message if not successful

def _run(self, alert):

if alert:
self._trigger_incident(alert)
else:
raise ValueError("invalid json_params!")

def run_job(self, string_params):
print("string_params", string_params)
if type(string_params) is dict:
conf = string_params
else:
conf = json.loads(string_params)

self._run(**conf)


if __name__ == '__main__':
fire.Fire(PagerDutyAlert())

No comments:

Post a Comment