2014年12月24日水曜日

F#における非同期Socket

.NET FrameworkにはVersion 1.0からSocketクラスがあり、APM; Asynchronous Programming Modelと呼ばれるBegin / End系メソッドが用意されています。しかし実際にはIOの度にIAsyncResultオブジェクトを作成する必要があり、ハイパフォーマンスなアプリケーションは実現しづらいものでした。 そのため、Version 2.0 SP1にてSocketAsyncEventArgsクラス及びAsync系のメソッドが新規に追加されました。こちらは内部状態を持つSocketAsyncEventArgsクラスを再利用することで効率の良い非同期処理が行えるものとなっています。なお、Version 4.5で導入された非同期処理とメソッド名の命名規則が一致していますが全くの別物となっています。
これをF#で扱えないものかと検索したところF#-friendly SocketAsyncEventArgsを見つけました。ただし、残念なことにSocketAsyncEventArgsクラスの設計思想を意識されておらず、毎回SocketAsyncEventArgsオブジェクトを再作成するだけの単なるwrapperでしかありませんでした。さらに言えばF#には非同期ワークフローもありますからこちらも利用したいところです。 仕方がないので自作してみました。

module Sayuri.Net.SocketExtensions
open System
open System.Net
open System.Net.Sockets
#if NET4
type private ConcurrentBag<'T> = System.Collections.Concurrent.ConcurrentBag<'T>
#else
type private ConcurrentBag<'T>() =
let bag = System.Collections.Generic.Stack<'T>()
member this.TryTake() =
lock bag (fun () -> if 0 < bag.Count then true, bag.Pop() else false, Unchecked.defaultof<_>)
member this.Add(item) =
lock bag (fun () -> bag.Push item)
#endif
let inline private checkNonNull name arg =
match box arg with null -> nullArg name | _ -> ()
let private pool = ConcurrentBag()
let private invoke methodAsync prepare result = async {
let e = match pool.TryTake() with
| true, e -> e
| false, _ -> new SocketAsyncEventArgs()
try
prepare e
return! Async.FromContinuations(fun (cont, econt, _) ->
let called = ref 0
let completed (e : SocketAsyncEventArgs) =
assert(System.Threading.Interlocked.Increment called = 1)
(e.UserToken :?> IDisposable).Dispose()
#if NET4
if e.ConnectByNameError <> null then econt e.ConnectByNameError else
#endif
if e.SocketError <> SocketError.Success then new SocketException(int e.SocketError) |> econt else
result e |> cont
e.UserToken <- e.Completed.Subscribe completed
if methodAsync e |> not then completed e
)
finally
e.AcceptSocket <- null
e.BufferList <- null
e.RemoteEndPoint <- null
e.SocketFlags <- SocketFlags.None
e.UserToken <- null
e.SetBuffer(null, 0, 0)
pool.Add(e)
}
let private setBuffer buffer offset count (e : SocketAsyncEventArgs) =
let offset = defaultArg offset 0
let count = defaultArg count (Array.length buffer - offset)
e.SetBuffer(buffer, offset, count)
type Socket with
member this.AsyncAccept() =
invoke this.AcceptAsync
ignore
(fun e -> e.AcceptSocket)
member this.AsyncAccept(buffer, ?offset, ?count) =
invoke this.AcceptAsync
(fun e -> setBuffer buffer offset count e
assert ((this.LocalEndPoint.Serialize().Size + 16) * 2 < e.Count)) // test buffer size.
(fun e -> e.AcceptSocket, e.BytesTransferred)
member this.AsyncAccept(acceptSocket) =
checkNonNull "acceptSocket" acceptSocket
invoke this.AcceptAsync
(fun e -> e.AcceptSocket <- acceptSocket)
ignore
member this.AsyncAccept(acceptSocket, buffer, ?offset, ?count) =
checkNonNull "acceptSocket" acceptSocket
checkNonNull "buffer" buffer
invoke this.AcceptAsync
(fun e -> setBuffer buffer offset count e
assert ((this.LocalEndPoint.Serialize().Size + 16) * 2 < e.Count) // test buffer size.
e.AcceptSocket <- acceptSocket)
(fun e -> e.BytesTransferred)
member this.AsyncConnect(remoteEndPoint) =
checkNonNull "remoteEndPoint" remoteEndPoint
invoke this.ConnectAsync
(fun e -> e.RemoteEndPoint <- remoteEndPoint)
ignore
member this.AsyncConnect(remoteEndPoint, buffer, ?offset, ?count) =
checkNonNull "remoteEndPoint" remoteEndPoint
checkNonNull "buffer" buffer
invoke this.ConnectAsync
(fun e -> setBuffer buffer offset count e
e.RemoteEndPoint <- remoteEndPoint)
(fun e -> e.BytesTransferred)
member this.AsyncConnect(host, port) =
checkNonNull "host" host
if port < IPEndPoint.MinPort || IPEndPoint.MaxPort < port then ArgumentOutOfRangeException "port" |> raise
#if NET4
invoke this.ConnectAsync
(fun e -> e.RemoteEndPoint <- DnsEndPoint(host, port))
ignore
#else
Async.FromBeginEnd<string, _, _>(host, port, this.BeginConnect, this.EndConnect)
#endif
member this.AsyncDisconnect(reuseSocket) =
invoke this.DisconnectAsync
(fun e -> e.DisconnectReuseSocket <- reuseSocket)
ignore
member this.AsyncReceive(buffer, ?offset, ?count, ?socketFlags) =
checkNonNull "buffer" buffer
invoke this.ReceiveAsync
(fun e -> setBuffer buffer offset count e
e.SocketFlags <- defaultArg socketFlags SocketFlags.None)
(fun e -> e.BytesTransferred)
member this.AsyncReceive(bufferList, ?socketFlags) =
checkNonNull "bufferList" bufferList
invoke this.ReceiveAsync
(fun e -> e.BufferList <- bufferList
e.SocketFlags <- defaultArg socketFlags SocketFlags.None)
(fun e -> e.BytesTransferred)
member this.AsyncSend(buffer, ?offset, ?count, ?socketFlags) =
checkNonNull "buffer" buffer
invoke this.SendAsync
(fun e -> setBuffer buffer offset count e
e.SocketFlags <- defaultArg socketFlags SocketFlags.None)
(fun e -> e.BytesTransferred)
member this.AsyncSend(bufferList, ?socketFlags) =
checkNonNull "bufferList" bufferList
invoke this.SendAsync
(fun e -> e.BufferList <- bufferList
e.SocketFlags <- defaultArg socketFlags SocketFlags.None)
(fun e -> e.BytesTransferred)
書いただけでまだ使っていないので動くかわかりません。
蛇足ですが、Socketクラスは内部でWinsockを使っていますが、このWinsockの機能の一つにaccept()で接続を受け付けると同時にrecv()を行うことができます。また対称にconnect()と同時にsend()もできます。こうすることでHTTPなど一般的なプロトコルでリクエストの送受信ができ、システムコール回数を減らし、システムの応答性能が向上します。Socketクラスはこの機能に対応しているため、今回の拡張メソッドにも含めています。