get_jackbox_room_code.py
· 1.7 KiB · Python
Raw
# a function to listen for the current jackbox room id being received
# requirements: pyshark python module, tshark library or wireshark
# sometimes the code gets cut off between packets. need to restart the current game if no code is found; this happens 10% of the time
def get_jackbox_room_code(packet_count=None):
""" blocks until room code is found or packet_count is reached
>>> get_jackbox_room_code()
'CTLE' """
def get_outgoing_ip():
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
roomcode = ""
c = pyshark.LiveCapture(display_filter="websocket")
for packet in c.sniff_continuously(packet_count):
if packet.ip.addr[0] != get_outgoing_ip():
try:
d = packet.websocket.payload_unknown.replace(":", "").decode('hex')
if '"roomId":"' in d:
roomcode = d.split('"roomId":"')[1].split('"')[0]
elif '","blob":{"state":"Lobby_WaitingForMore"}}' in d:
roomcode = d.split('","blob":{"state":"Lobby_WaitingForMore"}}')[0].split('"')[1]
except (AttributeError, IndexError):
pass
try:
d = packet.data.tcp_reassembled_data.replace(":", "").decode('hex')
if '"roomId":"' in d:
roomcode = d.split('"roomId":"')[1].split('"')[0]
elif '","blob":{"state":"Lobby_WaitingForMore"}}' in d:
roomcode = d.split('","blob":{"state":"Lobby_WaitingForMore"}}')[0].split('"')[1]
except (AttributeError, IndexError):
pass
if roomcode and len(roomcode) == 4:
break
return roomcode
| 1 | # a function to listen for the current jackbox room id being received |
| 2 | # requirements: pyshark python module, tshark library or wireshark |
| 3 | # sometimes the code gets cut off between packets. need to restart the current game if no code is found; this happens 10% of the time |
| 4 | def get_jackbox_room_code(packet_count=None): |
| 5 | """ blocks until room code is found or packet_count is reached |
| 6 | >>> get_jackbox_room_code() |
| 7 | 'CTLE' """ |
| 8 | def get_outgoing_ip(): |
| 9 | import socket |
| 10 | s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 11 | s.connect(("8.8.8.8", 80)) |
| 12 | return s.getsockname()[0] |
| 13 | roomcode = "" |
| 14 | c = pyshark.LiveCapture(display_filter="websocket") |
| 15 | for packet in c.sniff_continuously(packet_count): |
| 16 | if packet.ip.addr[0] != get_outgoing_ip(): |
| 17 | try: |
| 18 | d = packet.websocket.payload_unknown.replace(":", "").decode('hex') |
| 19 | if '"roomId":"' in d: |
| 20 | roomcode = d.split('"roomId":"')[1].split('"')[0] |
| 21 | elif '","blob":{"state":"Lobby_WaitingForMore"}}' in d: |
| 22 | roomcode = d.split('","blob":{"state":"Lobby_WaitingForMore"}}')[0].split('"')[1] |
| 23 | except (AttributeError, IndexError): |
| 24 | pass |
| 25 | try: |
| 26 | d = packet.data.tcp_reassembled_data.replace(":", "").decode('hex') |
| 27 | if '"roomId":"' in d: |
| 28 | roomcode = d.split('"roomId":"')[1].split('"')[0] |
| 29 | elif '","blob":{"state":"Lobby_WaitingForMore"}}' in d: |
| 30 | roomcode = d.split('","blob":{"state":"Lobby_WaitingForMore"}}')[0].split('"')[1] |
| 31 | except (AttributeError, IndexError): |
| 32 | pass |
| 33 | if roomcode and len(roomcode) == 4: |
| 34 | break |
| 35 | return roomcode |