FT8 Telegram Bots

For FT8 operations it is sometimes desireable to get a notification once a specific station you call answers your call. Maybe you do not want to spend the whole time staring at the screen waiting for the magic red line to appear. So for this use case I made a little python script feeding a Telegram bot with the message once there is a transmission adressed to me.

It requires python3 and the telegram-bot libs installed. The latter can be achieved with a simple pip install:

$ pip install python-telegram-bot

The script listens for incoming WSJT-X UDP messages on localhost port 2237 (default settings of WSJT-X). The messages are parsed and once it contains my call right at the beginng it fires of a Telegram message using a bot (which has to be created prior to that). Configure the script with your bot credentials, the chat id to send to and the callsign you want to monitor.

The code looks like this:

#!/usr/bin/env python3
import re
import socket
import telegram
import asyncio

from struct import *

localIP     = '127.0.0.1'
localPort   = 2237
bufferSize  = 1024

rxsock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
rxsock.bind((localIP, localPort))

botToken = '123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11'
chatId = 123456789

myCall = 'YOURCALL'

async def send(msg, chatId, token=botToken):
    bot = telegram.Bot(token=token)
    await bot.send_message(chat_id=chatId, text='`'+msg+'`', parse_mode='MarkdownV2')

while(True):

    data, addr = rxsock.recvfrom(bufferSize)
    idx = 0
    magic = data[idx:idx+4]
    # Detect WSJT-X magic byte(s)
    if magic == b'\xad\xbc\xcb\xda':
        idx = 4
        # WSJT-X packet received
        scheme = unpack('>I', data[idx:idx+4])[0]
        idx += 4
        messagetype = unpack('>I', data[idx:idx+4])[0]
        idx += 4
        if messagetype == 2:
            # decode received
            length = unpack('>I', data[idx:idx+4])[0]
            idx += 4
            idx += length
            new = unpack('?', data[idx:idx+1])[0]
            idx += 1
            time = unpack('>I', data[idx:idx+4])[0]
            time /= 1000
            minutes, seconds = divmod(time, 60)
            hours, minutes = divmod(minutes, 60)
            time = '{0:02.0f}{1:02.0f}{2:02.0f}z'.format(hours, minutes, seconds)
            idx += 4
            snr = unpack('>i', data[idx:idx+4])[0]
            idx += 4
            dT = unpack('>d', data[idx:idx+8])[0]
            idx += 8
            dF = unpack('>I', data[idx:idx+4])[0]
            idx += 4
            length = unpack('>I', data[idx:idx+4])[0]
            idx += 4
            mode = data[idx:idx+length].decode('utf-8')
            if mode == '~':
                mode = 'FT8'
            if mode == '+':
                mode = 'FT4'
            if mode == '$':
                mode = 'JT4'
            if mode == '#':
                mode = 'JT65'
            if mode == '&':
                mode = 'MSK144'
            if mode == '@':
                mode = 'JT9'
            idx += length
            length = unpack('>I', data[idx:idx+4])[0]
            idx += 4
            message = data[idx:idx+length].decode('utf-8')
            idx += length
            conf = unpack('?', data[idx:idx+1])[0]
            idx += 1
            offair = unpack('?', data[idx:idx+1])[0]
            call = ''
            grid = ''
            if re.search(r"^"+myCall, message):
                asyncio.run(send(message, chatId, botToken))

The code parses the WSJT-X UDP messages “by hand” and stores several variables. Just too lazy to remove all that stuff. And there is probably some libraries out there to do the job… 8-)

The result is shown in a QSO on 20m Ft8 with IS0FAP:

WSJT-X QSO with IS0FAP
WSJT-X QSO with IS0FAP

The following screenshot shows the output of the bot (previous message stem from other experiments and are to be ignored):

Telegram Message Received
Telegram Message Received

Check Cloudlog for new Initials

For QO-100 activity in FT8 there is a proof of concept of the code that checks my (Cloug)log for existing QSOs and sends a message in case a new inital is calling CQ. Checking Cloudlog is done via its API that has been implemented recently. This only works if you have a public slug for the logbook you want to search as well as a (at least) read-only API key.

#!/usr/bin/env python3
import re
import socket
from struct import *
import requests
import telegram
import asyncio

# Cloudlog settings
api_key = "API_KEY"
public_slug = "PUBLIC_SLUG"
band = "SAT"
url = "https://CLOUDLOG.BASE.URL/index.php/api/logbook_check_callsign"

# Telegram settings
my_token = '123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11'
chat_id = 123456789

# WSJT-X settings
localIP     = "127.0.0.1"
localPort   = 2237
bufferSize  = 1024

rxsock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
rxsock.bind((localIP, localPort))

async def send(msg, chat_id, token=my_token):
    bot = telegram.Bot(token=token)
    await bot.send_message(chat_id=chat_id, text='`'+msg+'`', parse_mode='MarkdownV2')

while(True):

    data, addr = rxsock.recvfrom(bufferSize)
    idx = 0
    magic = data[idx:idx+4]
    # Detect WSJT-X magic byte(s)
    if magic == b'\xad\xbc\xcb\xda':
        idx = 4
        scheme = unpack('>I', data[idx:idx+4])[0]
        idx += 4
        messagetype = unpack('>I', data[idx:idx+4])[0]
        idx += 4
        if messagetype == 2:
            length = unpack('>I', data[idx:idx+4])[0]
            idx += 4
            idx += length
            new = unpack('?', data[idx:idx+1])[0]
            idx += 1
            time = unpack('>I', data[idx:idx+4])[0]
            time /= 1000
            minutes, seconds = divmod(time, 60)
            hours, minutes = divmod(minutes, 60)
            time = '{0:02.0f}{1:02.0f}{2:02.0f}z'.format(hours, minutes, seconds)
            idx += 4
            snr = unpack('>i', data[idx:idx+4])[0]
            idx += 4
            dT = unpack('>d', data[idx:idx+8])[0]
            idx += 8
            dF = unpack('>I', data[idx:idx+4])[0]
            idx += 4
            length = unpack('>I', data[idx:idx+4])[0]
            idx += 4
            mode = data[idx:idx+length].decode('utf-8')
            idx += length
            length = unpack('>I', data[idx:idx+4])[0]
            idx += 4
            message = data[idx:idx+length].decode('utf-8')
            idx += length
            conf = unpack('?', data[idx:idx+1])[0]
            idx += 1
            offair = unpack('?', data[idx:idx+1])[0]
            call = ''
            grid = ''
            if re.search(r"^CQ", message):
                parts = re.split(r' ', message)
                call = parts[1]
                if len(parts) == 3:
                    grid = parts[2]
                data = {
                    "key" : api_key,
                    "logbook_public_slug" : public_slug,
                    "band" : band,
                    "callsign" : call
                }
                headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
                r = requests.post(url, json=data, headers=headers)
                result = r.json()
                string = time+"%3d"+str(round(dT,2))+' '+mode+' '+message % snr
                if (result["result"] == "Found"):
                    asyncio.run(send(string, chat_id, my_token))