最近在写一个监控港股异动的小工具,后端是用 Python 写的。在对接行情数据时,遇到了不少网络编程的经典问题,特此记录一下。

问题背景: 需求很简单:订阅大概20只港股科技股的实时价格,一旦涨跌幅超过阈值就报警。 一开始用了简单的 requests 轮询,结果发现要想达到实时的效果,请求频率太高,很容易触发服务端的 Rate Limit(速率限制),IP 直接被 Ban。

技术选型: 既然轮询行不通,那就必须上 WebSocket。这需要服务端支持主动推送。找了一圈,发现支持 WebSocket 的港股数据源并不多(大部分还是传统的 REST API)。最后锁定了 AllTick 的接口进行调试,文档写得比较清楚,鉴权方式也标准。

踩坑与填坑

  1. JSON 解析错误:服务端推送的数据并不总是完美的 JSON,有时候网络包截断会导致 json.loads 抛出异常。解决:加 try-catch,对于解析失败的包直接丢弃,保证主线程不挂。
  2. 僵尸连接:有时候网络实际上已经断了,但客户端没有收到 Close 帧。解决:必须在应用层实现心跳检测(Ping/Pong),或者设置 socket 的超时时间。

代码实现: 这是我封装的一个健壮的 WebSocket 客户端类(伪代码结构):

import websocket
import json
 
def on_message(ws, message):
    data = json.loads(message)
    print(data)
 
def on_error(ws, error):
    print(error)
 
def on_close(ws, close_status_code, close_msg):
    print("Closed")
 
def on_open(ws):
    print("Connected to the WebSocket")
 
ws_url = "wss://api.alltick.co/realtime/marketdata"
ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close)
ws.on_open = on_open
ws.run_forever()

def process_data(data):
    symbol = data['symbol']
    price = data['price']
    change = data['change']
    print(f"Stock: {symbol}, Price: {price}, Change: {change}%")
 
def on_message(ws, message):
    data = json.loads(message)
    process_data(data)

数据清洗 Tip: 拿到的原始数据通常包含很多冗余字段。为了减轻后续处理压力,建议在 process_data 函数里只提取 symbol, last_price, timestamp 这几个关键字段。

最终效果: 目前这个脚本跑在我的阿里云服务器上,内存占用不到 100MB,非常稳定。

标签: none

添加新评论