net.Listener 技巧 net.Pipe 的妙用

golang 的 net.Listener 和 ne.Conn 實在是很好用,標準庫的 Listener Conn 接口爲網路通訊定義了標準的函數簽名,所以任何符合此簽名的 golang 代碼都可以互相協作。先看下 Listener 定義如下:

// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {
	// Accept waits for and returns the next connection to the listener.
	Accept() (Conn, error)

	// Close closes the listener.
	// Any blocked Accept operations will be unblocked and return errors.
	Close() error

	// Addr returns the listener's network address.
	Addr() Addr
}

再看下 Conn 定義如下(省略了部分內容只保留了主要簽名):

// Conn is a generic stream-oriented network connection.
//
// Multiple goroutines may invoke methods on a Conn simultaneously.
type Conn interface {
	// Read reads data from the connection.
	// Read can be made to time out and return an error after a fixed
	// time limit; see SetDeadline and SetReadDeadline.
	Read(b []byte) (n int, err error)

	// Write writes data to the connection.
	// Write can be made to time out and return an error after a fixed
	// time limit; see SetDeadline and SetWriteDeadline.
	Write(b []byte) (n int, err error)

	// Close closes the connection.
	// Any blocked Read or Write operations will be unblocked and return errors.
	Close() error

	...
}

請仔細思考下上述兩個接口,其實完全和網路通訊無關,應該稱爲通訊接口,任何滿足此接口的組件都可以通信,只不過我們通常將其用於網路通信並且標準庫爲網路通信實現了此兩個接口。

比如我們可以使用內存來實現上述接口來替代通常的 golang 網路庫,於是你會發現網路庫一下子變成了程式內存通訊庫,是否很有趣。同理按照此思路如果你寫一個本地內存通訊的組件,讓其實現上述接口,你在必要時就可以毫不費力的將此程式改成網路通訊組件。

因此本喵推薦,無論是在寫網路通訊組件或是內存中多個 goroutine 間通訊或其它什麼通訊組件你都應該儘量使其符合 Listener Conn 接口的定義,如此代碼將可工作在多種環境(網路,內存…)

net.Pipe

標準庫 net.Pipe 函數在內存中創建同步的全雙工通訊組件,並且其已經實現了 net.Conn 接口。一端的讀取與另一端的寫入匹配,沒有內部緩衝可以高效的在兩者間複製數據。其簽名如下

func Pipe() (Conn, Conn)

已經有了內存通訊的 net.Conn 我們在自己實現下 Listener 就可以輕鬆的將網路程式和內存通訊程式間無縫切換,有了 Pipe 實現 Listener 也很簡單,創建一個 chan 當 Accept 時從 chan 中讀取即可,而Dial 時調用 Pipe 將 返回的兩個 Conn 一個傳入 chan 另一個返回給調用者即可,本喵的實現如下:

package main

import (
	"context"
	"errors"
	"net"
	"sync"
	"sync/atomic"
)

var ErrPipeListenerClosed = errors.New(`pipe listener already closed`)

type PipeListener struct {
	ch    chan net.Conn
	close chan struct{}
	done  uint32
	m     sync.Mutex
}

func ListenPipe() *PipeListener {
	return &PipeListener{
		ch:    make(chan net.Conn),
		close: make(chan struct{}),
	}
}

// Accept waits for and returns the next connection to the listener.
func (l *PipeListener) Accept() (c net.Conn, e error) {
	select {
	case c = <-l.ch:
	case <-l.close:
		e = ErrPipeListenerClosed
	}
	return
}

// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
func (l *PipeListener) Close() (e error) {
	if atomic.LoadUint32(&l.done) == 0 {
		l.m.Lock()
		defer l.m.Unlock()
		if l.done == 0 {
			defer atomic.StoreUint32(&l.done, 1)
			close(l.close)
			return
		}
	}
	e = ErrPipeListenerClosed
	return
}

// Addr returns the listener's network address.
func (l *PipeListener) Addr() net.Addr {
	return pipeAddr(0)
}
func (l *PipeListener) Dial(network, addr string) (net.Conn, error) {
	return l.DialContext(context.Background(), network, addr)
}
func (l *PipeListener) DialContext(ctx context.Context, network, addr string) (conn net.Conn, e error) {
	// check closed
	if atomic.LoadUint32(&l.done) != 0 {
		e = ErrPipeListenerClosed
		return
	}

	// pipe
	c0, c1 := net.Pipe()
	// waiting accepted or closed or done
	select {
	case <-ctx.Done():
		e = ctx.Err()
	case l.ch <- c0:
		conn = c1
	case <-l.close:
		c0.Close()
		c1.Close()
		e = ErrPipeListenerClosed
	}
	return
}

type pipeAddr int

func (pipeAddr) Network() string {
	return `pipe`
}
func (pipeAddr) String() string {
	return `pipe`
}

最後來寫個調用代碼來將 http 中使用的 tcp 通訊改爲內存通訊:

package main

import (
	"context"
	"fmt"
	"io/ioutil"
	"log"
	"net"
	"net/http"
)

func main() {
	log.SetFlags(log.Lshortfile | log.LstdFlags)
	l := ListenPipe()
	log.Println(`listen on pipe`)
	go runServer(l)
	runClient(l.Dial, l.DialContext)
}
func runClient(dial func(network, addr string) (net.Conn, error), dialContext func(ctx context.Context, network, addr string) (net.Conn, error)) {
	client := &http.Client{
		Transport: &http.Transport{
			Dial:        dial,
			DialContext: dialContext,
		},
	}
	printResponse(client.Get(`http://localhost/speak`))
	printResponse(client.Get(`http://localhost/version`))
}
func printResponse(resp *http.Response, e error) {
	if e != nil {
		log.Fatalln(e)
	}
	b, e := ioutil.ReadAll(resp.Body)
	if e != nil {
		log.Fatalln(e)
	}
	fmt.Println(resp.Status)
	fmt.Println(string(b))
}
func runServer(l net.Listener) {
	m := http.NewServeMux()
	m.HandleFunc(`/speak`, func(rw http.ResponseWriter, r *http.Request) {
		rw.Header().Set(`Content-Type`, `text/plain; charset=utf-8`)
		rw.Write([]byte(`server serve on pipe`))
	})
	m.HandleFunc(`/version`, func(rw http.ResponseWriter, r *http.Request) {
		rw.Header().Set(`Content-Type`, `text/plain; charset=utf-8`)
		rw.Write([]byte(`pipe server v1.0.0`))
	})
	http.Serve(l, m)
}

其它有趣的用法

除去上述用法,本喵也想到了另外一些有趣的玩法,此處只是一提,如果後面有空將單獨發佈文章討論實現細節。

grpc-gateway 是一個 grpc 的反向代理,可以爲 grpc 提供一個 http 的1.1兼容接口,其原理是提供了一套 http 接口供調用者使用 http 調用,內部發送 grpc 請求到 grpc 服務,最後將 grpc 的響應再轉爲 http 響應返回給調用者。gateway 到 grpc 間的通訊要經過網卡使用 tcp 協議通訊,如果 gateway 和 grpc 在同個服務器上其實可以將它們放到一個程式中並讓 grpc 工作在上述的 PipeListener 接口上,這樣就可以讓 gateway 和 grpc 直接內存通訊而不用經過網卡。

反向木馬寫起來很簡單但很繁瑣,如果可以用 grpc 則會簡單很多,可以自己實現 Listener 讓撥號的木馬端作爲 grpc 服務器這看起來也很有趣。另外不止反向木馬其實只要是類似這種需要顛倒撥號方的情況都可以使用自己實現的 Listener 來讓其代碼看起來和沒有顛倒的正常程式一樣舒服。

此外將一個協議的 Listener 套到另外一個協議的 Listener 一下子就實現了各種 over 操作,比如 vnc over ssh,ssh over websocket,甚至 vnc over ssh over websocket 之類的有趣事情。

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *