Python连接港股Websocket接口的断连与重连实践
最近在写一个监控港股异动的小工具,后端是用 Python 写的。在对接行情数据时,遇到了不少网络编程的经典问题,特此记录一下。 问题背景: 需求很简单:订阅大概20只港股科技股的实时价格,一旦涨跌幅超过阈值就报警。 一开始用了简单的 技术选型: 既然轮询行不通,那就必须上 WebSocket。这需要服务端支持主动推送。找了一圈,发现支持 WebSocket 的港股数据源并不多(大部分还是传统的 REST API)。最后锁定了 AllTick 的接口进行调试,文档写得比较清楚,鉴权方式也标准。 踩坑与填坑: 代码实现: 这是我封装的一个健壮的 WebSocket 客户端类(伪代码结构): 数据清洗 Tip: 拿到的原始数据通常包含很多冗余字段。为了减轻后续处理压力,建议在 process_data 函数里只提取 symbol, last_price, timestamp 这几个关键字段。 最终效果: 目前这个脚本跑在我的阿里云服务器上,内存占用不到 100MB,非常稳定。requests 轮询,结果发现要想达到实时的效果,请求频率太高,很容易触发服务端的 Rate Limit(速率限制),IP 直接被 Ban。json.loads 抛出异常。解决:加 try-catch,对于解析失败的包直接丢弃,保证主线程不挂。Close 帧。解决:必须在应用层实现心跳检测(Ping/Pong),或者设置 socket 的超时时间。import websocket
import json
def on_message(ws, message):
data = json.loads(message)
print(data)
def on_error(ws, error):
print(error)
def on_close(ws, close_status_code, close_msg):
print("Closed")
def on_open(ws):
print("Connected to the WebSocket")
ws_url = "wss://api.alltick.co/realtime/marketdata"
ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close)
ws.on_open = on_open
ws.run_forever()
def process_data(data):
symbol = data['symbol']
price = data['price']
change = data['change']
print(f"Stock: {symbol}, Price: {price}, Change: {change}%")
def on_message(ws, message):
data = json.loads(message)
process_data(data)