NPS 客户端核心解析:连接管理与流量转发

引言

在 NPS 系列文章的前几篇中,我们详细剖析了 NPS 服务端的各个模块和代理实现。本篇文章将转向 NPS 的 客户端(Client) 模块,深入分析 nps/client/client.go 文件。这个文件是 NPS 客户端的“大脑”,负责与服务端建立连接、管理隧道、处理不同类型的流量以及维护客户端的生命周期。理解客户端的运作机制,是掌握 NPS 完整内网穿透流程的关键。

client.go:客户端的“大脑”

client.go 文件定义了 TRPClient 结构体,它是 NPS 客户端的核心。它负责客户端的启动、与服务端的通信、隧道管理以及流量转发。

TRPClient 结构体

TRPClient 结构体包含了客户端运行所需的所有关键信息:

type TRPClient struct {
    svrAddr        string
    bridgeConnType string
    proxyUrl       string
    vKey           string
    p2pAddr        map[string]string
    tunnel         *nps_mux.Mux
    signal         *conn.Conn
    ticker         *time.Ticker
    cnf            *config.Config
    disconnectTime int
    once           sync.Once
}
  • svrAddr string:NPS 服务端的地址。
  • bridgeConnType string:与服务端连接的桥接类型(例如 TCP、KCP 等)。
  • proxyUrl string:如果客户端通过代理连接服务端,则为代理地址。
  • vKey string:客户端的验证密钥。
  • p2pAddr map[string]string:用于 P2P 连接的地址映射。
  • tunnel *nps_mux.Mux:基于 nps_mux 实现的多路复用隧道连接,用于承载多个逻辑连接。
  • signal *conn.Conn:与服务端建立的主控制连接,用于发送心跳、接收服务端指令等。
  • ticker *time.Ticker:用于定时发送心跳包。
  • cnf *config.Config:客户端的配置信息,包括健康检查等。
  • disconnectTime int:连接断开后的重连时间。
  • once sync.Once:用于确保 Close() 方法只执行一次。

NewRPClient() 函数用于创建并初始化一个 TRPClient 实例。

Start() 方法:客户端的启动流程

TRPClientStart() 方法是客户端的入口点,它负责建立与服务端的连接并启动各项服务:

func (s *TRPClient) Start() {
    CloseClient = false // 全局变量,控制客户端关闭
retry:
    if CloseClient {
        return
    }
    NowStatus = 0 // 全局变量,表示客户端当前状态
    c, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_MAIN, s.proxyUrl) // 建立主控制连接
    if err != nil {
        logs.Error("The connection server failed and will be reconnected in five seconds, error", err.Error())
        time.Sleep(time.Second * 5)
        goto retry // 连接失败,5秒后重试
    }
    if c == nil {
        logs.Error("Error data from server, and will be reconnected in five seconds")
        time.Sleep(time.Second * 5)
        goto retry // 服务端返回错误数据,5秒后重试
    }
    logs.Info("Successful connection with server %s", s.svrAddr)
    go s.ping() // 启动心跳协程
    s.signal = c // 存储主控制连接
    go s.newChan() // 启动多路复用隧道连接
    if s.cnf != nil && len(s.cnf.Healths) > 0 {
        go heathCheck(s.cnf.Healths, s.signal) // 启动健康检查
    }
    NowStatus = 1 // 客户端状态设置为已连接
    s.handleMain() // 处理主控制连接的指令
}
  • 连接重试机制:客户端会尝试连接服务端,如果失败则等待 5 秒后重试,直到连接成功。
  • 主控制连接 (signal):通过 NewConn() 函数建立与服务端的主控制连接,用于发送心跳、接收服务端指令等。
  • 心跳机制 (ping()):启动一个 goroutine 定时发送心跳包,以维持连接活性并检测服务端状态。
  • 多路复用隧道 (newChan()):启动一个 goroutine 建立基于 nps_mux 的多路复用隧道,所有实际的隧道流量都将通过这个隧道传输。
  • 健康检查 (heathCheck()):如果配置了健康检查,则启动一个 goroutine 定期检查本地服务的健康状态。
  • 主指令处理 (handleMain()):在主 goroutine 中,客户端会持续读取 signal 连接的数据,处理来自服务端的指令。

handleMain():处理服务端指令

handleMain() 方法在一个循环中读取 signal 连接的数据,并根据不同的 flags 处理来自服务端的指令:

func (s *TRPClient) handleMain() {
    for {
        flags, err := s.signal.ReadFlag() // 读取指令标志
        if err != nil {
            logs.Error("Accept server data error %s, end this service", err.Error())
            break // 读取失败,断开连接
        }
        switch flags {
        case common.NEW_UDP_CONN: // 新的 UDP 连接请求
            // ... 处理 UDP 连接 ...
        }
    }
    s.Close() // 循环结束,关闭客户端
}

目前 handleMain() 中只处理了 common.NEW_UDP_CONN 类型的指令,这表明服务端可以通过主控制连接通知客户端建立新的 UDP 连接。

newChan():建立多路复用隧道

newChan() 方法负责建立与服务端的多路复用隧道,并接受来自服务端的逻辑连接:

func (s *TRPClient) newChan() {
    tunnel, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_CHAN, s.proxyUrl) // 建立隧道连接
    if err != nil {
        logs.Error("connect to ", s.svrAddr, "error:", err)
        return
    }
    s.tunnel = nps_mux.NewMux(tunnel.Conn, s.bridgeConnType, s.disconnectTime) // 创建多路复用器
    for {
        src, err := s.tunnel.Accept() // 接受来自服务端的逻辑连接
        if err != nil {
            logs.Warn(err)
            s.Close() // 接受失败,关闭客户端
            break
        }
        go s.handleChan(src) // 处理逻辑连接
    }
}
  • NewConn():再次调用 NewConn() 建立一个类型为 common.WORK_CHAN 的连接,作为多路复用隧道的底层连接。
  • nps_mux.NewMux():使用 nps_mux 库将底层连接封装为多路复用器。
  • s.tunnel.Accept():在一个循环中,客户端会持续接受来自服务端的多路复用隧道上的逻辑连接。每个逻辑连接都代表一个具体的隧道请求(例如 TCP 隧道、HTTP 代理等)。
  • s.handleChan(src):为每个接受到的逻辑连接启动一个 goroutine,进行具体的流量转发处理。

handleChan():处理逻辑连接与流量转发

handleChan() 方法是客户端处理具体隧道流量的核心。它负责解析服务端发送的连接信息,并建立与本地目标服务的连接,然后进行数据转发:

func (s *TRPClient) handleChan(src net.Conn) {
    lk, err := conn.NewConn(src).GetLinkInfo() // 获取连接信息
    if err != nil || lk == nil {
        src.Close()
        logs.Error("get connection info from server error ", err)
        return
    }
    lk.Host = common.FormatAddress(lk.Host) // 格式化目标地址

    if lk.ConnType == "http" { // HTTP 代理
        // ... 处理 HTTP 请求 ...
        return
    }
    if lk.ConnType == "udp5" { // UDP 代理 (SOCKS5 UDP ASSOCIATE)
        logs.Trace("new %s connection with the goal of %s, remote address:%s", lk.ConnType, lk.Host, lk.RemoteAddr)
        s.handleUdp(src)
        return
    }

    // TCP 或其他类型连接
    if targetConn, err := net.DialTimeout(lk.ConnType, lk.Host, lk.Option.Timeout); err != nil {
        logs.Warn("connect to %s error %s", lk.Host, err.Error())
        src.Close()
    } else {
        logs.Trace("new %s connection with the goal of %s, remote address:%s", lk.ConnType, lk.Host, lk.RemoteAddr)
        // ... PROXY Protocol 处理 ...
        conn.CopyWaitGroup(src, targetConn, lk.Crypt, lk.Compress, nil, nil, false, nil, nil) // 数据拷贝
    }
}
  • conn.NewConn(src).GetLinkInfo():从逻辑连接 (src) 中读取服务端发送的 LinkInfo,其中包含了目标地址、连接类型、加密/压缩等信息。
  • HTTP 代理处理:如果 lk.ConnType 是 “http”,则会进入专门的 HTTP 请求处理逻辑,包括读取 HTTP 请求、转发给本地目标、读取响应并写回。
  • UDP 代理处理 (handleUdp()):如果 lk.ConnType 是 “udp5”(SOCKS5 UDP ASSOCIATE),则调用 s.handleUdp(src) 进行 UDP 流量转发。
  • TCP 或其他类型连接:对于 TCP 或其他类型的连接,客户端会直接 net.DialTimeout() 连接到本地的目标服务 (lk.Host)。
  • PROXY Protocol 支持:如果 lk.ProtoVersion 是 “V1” 或 “V2”,客户端会在建立连接后,向目标服务发送 PROXY Protocol 头,以便目标服务获取真实的客户端 IP 地址。
  • 数据拷贝 (conn.CopyWaitGroup()):最终,通过 conn.CopyWaitGroup() 在逻辑连接 (src) 和本地目标连接 (targetConn) 之间进行双向数据拷贝,实现流量转发。

handleUdp():客户端 UDP 流量转发

handleUdp() 方法负责客户端的 UDP 流量转发,主要用于 SOCKS5 的 UDP ASSOCIATE 模式:

  1. 本地 UDP 监听:客户端在本地监听一个 UDP 端口,用于接收来自本地应用的 UDP 数据。
  2. 数据封装与转发:将本地接收到的 UDP 数据包封装为 NPS 内部的 UDP 数据报格式,并通过 serverConn(与服务端建立的逻辑连接)发送给服务端。
  3. 数据解封装与转发:从 serverConn 读取服务端转发过来的 UDP 数据,解封装后写回给本地应用。

ping():心跳机制

ping() 方法在一个定时器循环中,定期检查多路复用隧道 (s.tunnel) 是否关闭。如果隧道关闭,则调用 s.Close() 关闭客户端。

Close():客户端的关闭

Close() 方法负责客户端的优雅关闭,确保所有资源被释放:

  • 设置 CloseClient 全局变量为 true,停止重试连接。
  • 关闭 s.tunnels.signal 连接。
  • 停止 s.ticker

总结

nps/client/client.go 文件是 NPS 客户端的核心,它通过与服务端的紧密协作,实现了强大的内网穿透功能。客户端负责建立和维护与服务端的连接,通过多路复用隧道承载各种代理流量,并根据服务端指令将流量转发到本地目标服务。其对不同协议(HTTP、UDP、TCP)的灵活处理,以及对 PROXY Protocol 的支持,使得 NPS 客户端能够适应各种复杂的网络环境和应用场景。

在下一篇文章中,我们将继续探索 NPS 客户端的其他辅助模块,例如 control.goregister.go