discord_presence_pbx.py
· 1.1 KiB · Python
Raw
import socket
from requests import get
EXTEN = "1"
presence_api = "http://127.0.0.1:8081"
auth = ("me", "aa")
pbx_ip = "127.0.0.2"
def process_event(resp):
d = {}
for line in resp.strip().split("\r\n"):
try:
name,data = line.split(": ", 1)
except ValueError:
if line.strip(): print(resp)
return None
d[name] = data
return d
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((pbx_ip,5038))
s.sendall("\r\nAction: login\r\nUsername: {0}\r\nSecret: {1}\r\n\r\n".format(*auth).encode("utf-8"))
while True:
data = s.recv(1024).decode("utf-8")
for resp in data.split("\r\n\r\n"):
resp = process_event(resp)
if not resp:
continue
if "Error" in resp:
print(resp)
if "Event" in resp and resp["Event"] == "ExtensionStatus" and resp["Exten"] == EXTEN:
print(resp)
if resp["Status"] == "1":
get(presence_api + "/_calls")
elif resp["Status"] == "0":
get(presence_api + "/_something")
| 1 | import socket |
| 2 | from requests import get |
| 3 | |
| 4 | EXTEN = "1" |
| 5 | presence_api = "http://127.0.0.1:8081" |
| 6 | auth = ("me", "aa") |
| 7 | pbx_ip = "127.0.0.2" |
| 8 | |
| 9 | def process_event(resp): |
| 10 | d = {} |
| 11 | for line in resp.strip().split("\r\n"): |
| 12 | try: |
| 13 | name,data = line.split(": ", 1) |
| 14 | except ValueError: |
| 15 | if line.strip(): print(resp) |
| 16 | return None |
| 17 | d[name] = data |
| 18 | return d |
| 19 | |
| 20 | with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
| 21 | s.connect((pbx_ip,5038)) |
| 22 | s.sendall("\r\nAction: login\r\nUsername: {0}\r\nSecret: {1}\r\n\r\n".format(*auth).encode("utf-8")) |
| 23 | while True: |
| 24 | data = s.recv(1024).decode("utf-8") |
| 25 | for resp in data.split("\r\n\r\n"): |
| 26 | resp = process_event(resp) |
| 27 | if not resp: |
| 28 | continue |
| 29 | if "Error" in resp: |
| 30 | print(resp) |
| 31 | if "Event" in resp and resp["Event"] == "ExtensionStatus" and resp["Exten"] == EXTEN: |
| 32 | print(resp) |
| 33 | if resp["Status"] == "1": |
| 34 | get(presence_api + "/_calls") |
| 35 | elif resp["Status"] == "0": |
| 36 | get(presence_api + "/_something") |