Last active 1670065464

connects to an asterisk manager and picks up ExtensionStatus messages. ties into https://gist.github.com/blha303/37e83f320b009de19e7a6c140f51e8d5

S Smith revised this gist 1552876695. Go to revision

1 file changed, 13 insertions, 13 deletions

discord_presence_pbx.py

@@ -1,36 +1,36 @@
1 1 import socket
2 2 from requests import get
3 3
4 - hints_exten = "115"
5 - presence_api = "http://10.1.1.2:8081"
6 -
7 - pbx_ip = "10.1.1.1"
8 - port = 5038
9 - buffersize = 1024
4 + EXTEN = "1"
5 + presence_api = "http://127.0.0.1:8081"
6 + auth = ("me", "aa")
7 + pbx_ip = "127.0.0.2"
10 8
11 9 def process_event(resp):
12 10 d = {}
13 - for line in resp.split("\r\n"):
11 + for line in resp.strip().split("\r\n"):
14 12 try:
15 13 name,data = line.split(": ", 1)
16 14 except ValueError:
17 - print(resp)
15 + if line.strip(): print(resp)
18 16 return None
19 17 d[name] = data
20 18 return d
21 19
22 20 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
23 - s.connect((pbx_ip,port))
24 - s.sendall(b"\r\nAction: login\r\nUsername: me\r\nSecret: aa\r\n\r\n")
21 + s.connect((pbx_ip,5038))
22 + s.sendall("\r\nAction: login\r\nUsername: {0}\r\nSecret: {1}\r\n\r\n".format(*auth).encode("utf-8"))
25 23 while True:
26 - data = s.recv(buffersize).decode("utf-8")
24 + data = s.recv(1024).decode("utf-8")
27 25 for resp in data.split("\r\n\r\n"):
28 26 resp = process_event(resp)
29 27 if not resp:
30 28 continue
31 - print(resp)
29 + if "Error" in resp:
30 + print(resp)
32 31 if "Event" in resp and resp["Event"] == "ExtensionStatus" and resp["Exten"] == EXTEN:
32 + print(resp)
33 33 if resp["Status"] == "1":
34 34 get(presence_api + "/_calls")
35 35 elif resp["Status"] == "0":
36 - get(presence_api + "/_clear")
36 + get(presence_api + "/_something")

S Smith revised this gist 1552875705. Go to revision

1 file changed, 36 insertions

discord_presence_pbx.py(file created)

@@ -0,0 +1,36 @@
1 + import socket
2 + from requests import get
3 +
4 + hints_exten = "115"
5 + presence_api = "http://10.1.1.2:8081"
6 +
7 + pbx_ip = "10.1.1.1"
8 + port = 5038
9 + buffersize = 1024
10 +
11 + def process_event(resp):
12 + d = {}
13 + for line in resp.split("\r\n"):
14 + try:
15 + name,data = line.split(": ", 1)
16 + except ValueError:
17 + print(resp)
18 + return None
19 + d[name] = data
20 + return d
21 +
22 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
23 + s.connect((pbx_ip,port))
24 + s.sendall(b"\r\nAction: login\r\nUsername: me\r\nSecret: aa\r\n\r\n")
25 + while True:
26 + data = s.recv(buffersize).decode("utf-8")
27 + for resp in data.split("\r\n\r\n"):
28 + resp = process_event(resp)
29 + if not resp:
30 + continue
31 + print(resp)
32 + if "Event" in resp and resp["Event"] == "ExtensionStatus" and resp["Exten"] == EXTEN:
33 + if resp["Status"] == "1":
34 + get(presence_api + "/_calls")
35 + elif resp["Status"] == "0":
36 + get(presence_api + "/_clear")
Newer Older