TCPConnection
A TCP connection. When connecting, the Happy Eyeballs algorithm is used.
The following code creates a client that connects to port 8989 of the local host, writes "hello world", and listens for a response, which it then prints.
use "net" class MyTCPConnectionNotify is TCPConnectionNotify let _out: OutStream new create(out: OutStream) => _out = out fun ref connected(conn: TCPConnection ref) => conn.write("hello world") fun ref received( conn: TCPConnection ref, data: Array[U8] iso, times: USize) : Bool => _out.print("GOT:" + String.from_array(consume data)) conn.close() true fun ref connect_failed(conn: TCPConnection ref) => None actor Main new create(env: Env) => try TCPConnection(env.root as AmbientAuth, recover MyTCPConnectionNotify(env.out) end, "", "8989") end
Note: when writing to the connection data will be silently discarded if the connection has not yet been established.
Backpressure support
Write
The TCP protocol has built-in backpressure support. This is generally experienced as the outgoing write buffer becoming full and being unable to write all requested data to the socket. In TCPConnection
, this is hidden from the programmer. When this occurs, TCPConnection
will buffer the extra data until such time as it is able to be sent. Left unchecked, this could result in uncontrolled queuing. To address this, TCPConnectionNotify
implements two methods throttled
and unthrottled
that are called when backpressure is applied and released.
Upon receiving a throttled
notification, your application has two choices on how to handle it. One is to inform the Pony runtime that it can no longer make progress and that runtime backpressure should be applied to any actors sending this one messages. For example, you might construct your application like:
// Here we have a TCPConnectionNotify that upon construction // is given a BackpressureAuth token. This allows the notifier // to inform the Pony runtime when to apply and release backpressure // as the connection experiences it. // Note the calls to // // Backpressure.apply(_auth) // Backpressure.release(_auth) // // that apply and release backpressure as needed use "backpressure" use "collections" use "net" class SlowDown is TCPConnectionNotify let _auth: BackpressureAuth let _out: StdStream new iso create(auth: BackpressureAuth, out: StdStream) => _auth = auth _out = out fun ref throttled(connection: TCPConnection ref) => _out.print("Experiencing backpressure!") Backpressure.apply(_auth) fun ref unthrottled(connection: TCPConnection ref) => _out.print("Releasing backpressure!") Backpressure.release(_auth) fun ref closed(connection: TCPConnection ref) => // if backpressure has been applied, make sure we release // when shutting down _out.print("Releasing backpressure if applied!") Backpressure.release(_auth) fun ref connect_failed(conn: TCPConnection ref) => None actor Main new create(env: Env) => try let auth = env.root as AmbientAuth let socket = TCPConnection(auth, recover SlowDown(auth, env.out) end, "", "7669") end
Or if you want, you could handle backpressure by shedding load, that is, dropping the extra data rather than carrying out the send. This might look like:
use "net" class ThrowItAway is TCPConnectionNotify var _throttled: Bool = false fun ref sent(conn: TCPConnection ref, data: ByteSeq): ByteSeq => if not _throttled then data else "" end fun ref sentv(conn: TCPConnection ref, data: ByteSeqIter): ByteSeqIter => if not _throttled then data else recover Array[String] end end fun ref throttled(connection: TCPConnection ref) => _throttled = true fun ref unthrottled(connection: TCPConnection ref) => _throttled = false fun ref connect_failed(conn: TCPConnection ref) => None actor Main new create(env: Env) => try TCPConnection(env.root as AmbientAuth, recover ThrowItAway end, "", "7669") end
In general, unless you have a very specific use case, we strongly advise that you don't implement a load shedding scheme where you drop data.
Read
If your application is unable to keep up with data being sent to it over a TCPConnection
you can use the builtin read backpressure support to pause reading the socket which will in turn start to exert backpressure on the corresponding writer on the other end of that socket.
The mute
behavior allow any other actors in your application to request the cessation of additional reads until such time as unmute
is called. Please note that this cessation is not guaranteed to happen immediately as it is the result of an asynchronous behavior call and as such will have to wait for existing messages in the TCPConnection
's mailbox to be handled.
On non-windows platforms, your TCPConnection
will not notice if the other end of the connection closes until you unmute it. Unix type systems like FreeBSD, Linux and OSX learn about a closed connection upon read. On these platforms, you must call unmute
on a muted connection to have it close. Without calling unmute
the TCPConnection
actor will never exit.
Proxy support
Using the proxy_via
callback in a TCPConnectionNotify
it is possible to implement proxies. The function takes the intended destination host and service as parameters and returns a 2-tuple of the proxy host and service.
The proxy TCPConnectionNotify
should decorate another implementation of TCPConnectionNotify
passing relevent data through.
Example proxy implementation
actor Main new create(env: Env) => MyClient.create( "example.com", // we actually want to connect to this host "80", ExampleProxy.create("proxy.example.com", "80")) // we connect via this proxy actor MyClient new create(host: String, service: String, proxy: Proxy = NoProxy) => let conn: TCPConnection = TCPConnection.create( env.root as AmbientAuth, proxy.apply(MyConnectionNotify.create()), host, service) class ExampleProxy is Proxy let _proxy_host: String let _proxy_service: String new create(proxy_host: String, proxy_service: String) => _proxy_host = proxy_host _proxy_service = proxy_service fun apply(wrap: TCPConnectionNotify iso): TCPConnectionNotify iso^ => ExampleProxyNotify.create(consume wrap, _proxy_service, _proxy_service) class iso ExampleProxyNotify is TCPConnectionNotify // Fictional proxy implementation that has no error // conditions, and always forwards the connection. let _proxy_host: String let _proxy_service: String var _destination_host: (None | String) = None var _destination_service: (None | String) = None let _wrapped: TCPConnectionNotify iso new iso create(wrap: TCPConnectionNotify iso, proxy_host: String, proxy_service: String) => _wrapped = wrap _proxy_host = proxy_host _proxy_service = proxy_service fun ref proxy_via(host: String, service: String): (String, String) => // Stash the original host & service; return the host & service // for the proxy; indicating that the initial TCP connection should // be made to the proxy _destination_host = host _destination_service = service (_proxy_host, _proxy_service) fun ref connected(conn: TCPConnection ref) => // conn is the connection to the *proxy* server. We need to ask the // proxy server to forward this connection to our intended final // destination. conn.write((_destination_host + "\n").array()) conn.write((_destination_service + "\n").array()) wrapped.connected(conn) fun ref received(conn, data, times) => _wrapped.received(conn, data, times) fun ref connect_failed(conn: TCPConnection ref) => None
actor tag TCPConnection
Constructors
create
Connect via IPv4 or IPv6. If from
is a non-empty string, the connection will be made from the specified interface.
new tag create( auth: (AmbientAuth val | NetAuth val | TCPAuth val | TCPConnectAuth val), notify: TCPConnectionNotify iso, host: String val, service: String val, from: String val = "", read_buffer_size: USize val = 16384, yield_after_reading: USize val = 16384, yield_after_writing: USize val = 16384) : TCPConnection tag^
Parameters
- auth: (AmbientAuth val | NetAuth val | TCPAuth val | TCPConnectAuth val)
- notify: TCPConnectionNotify iso
- host: String val
- service: String val
- from: String val = ""
- read_buffer_size: USize val = 16384
- yield_after_reading: USize val = 16384
- yield_after_writing: USize val = 16384
Returns
- TCPConnection tag^
ip4
Connect via IPv4.
new tag ip4( auth: (AmbientAuth val | NetAuth val | TCPAuth val | TCPConnectAuth val), notify: TCPConnectionNotify iso, host: String val, service: String val, from: String val = "", read_buffer_size: USize val = 16384, yield_after_reading: USize val = 16384, yield_after_writing: USize val = 16384) : TCPConnection tag^
Parameters
- auth: (AmbientAuth val | NetAuth val | TCPAuth val | TCPConnectAuth val)
- notify: TCPConnectionNotify iso
- host: String val
- service: String val
- from: String val = ""
- read_buffer_size: USize val = 16384
- yield_after_reading: USize val = 16384
- yield_after_writing: USize val = 16384
Returns
- TCPConnection tag^
ip6
Connect via IPv6.
new tag ip6( auth: (AmbientAuth val | NetAuth val | TCPAuth val | TCPConnectAuth val), notify: TCPConnectionNotify iso, host: String val, service: String val, from: String val = "", read_buffer_size: USize val = 16384, yield_after_reading: USize val = 16384, yield_after_writing: USize val = 16384) : TCPConnection tag^
Parameters
- auth: (AmbientAuth val | NetAuth val | TCPAuth val | TCPConnectAuth val)
- notify: TCPConnectionNotify iso
- host: String val
- service: String val
- from: String val = ""
- read_buffer_size: USize val = 16384
- yield_after_reading: USize val = 16384
- yield_after_writing: USize val = 16384
Returns
- TCPConnection tag^
Public Behaviours
write
Write a single sequence of bytes. Data will be silently discarded if the connection has not yet been established though.
be write( data: (String val | Array[U8 val] val))
Parameters
writev
Write a sequence of sequences of bytes. Data will be silently discarded if the connection has not yet been established though.
be writev( data: ByteSeqIter val)
Parameters
- data: ByteSeqIter val
mute
Temporarily suspend reading off this TCPConnection until such time as unmute
is called.
be mute()
unmute
Start reading off this TCPConnection again after having been muted.
be unmute()
set_notify
Change the notifier.
be set_notify( notify: TCPConnectionNotify iso)
Parameters
- notify: TCPConnectionNotify iso
dispose
Close the connection gracefully once all writes are sent.
be dispose()
Public Functions
local_address
Return the local IP address. If this TCPConnection is closed then the address returned is invalid.
fun box local_address() : NetAddress val
Returns
- NetAddress val
remote_address
Return the remote IP address. If this TCPConnection is closed then the address returned is invalid.
fun box remote_address() : NetAddress val
Returns
- NetAddress val
expect
A received
call on the notifier must contain exactly qty
bytes. If qty
is zero, the call can contain any amount of data. This has no effect if called in the sent
notifier callback.
Errors if qty
exceeds the max buffer size as indicated by the read_buffer_size
supplied when the connection was created.
fun ref expect( qty: USize val = 0) : None val ?
Parameters
- qty: USize val = 0
Returns
- None val ?
set_nodelay
Turn Nagle on/off. Defaults to on. This can only be set on a connected socket.
fun ref set_nodelay( state: Bool val) : None val
Parameters
- state: Bool val
Returns
- None val
set_keepalive
Sets the TCP keepalive timeout to approximately secs
seconds. Exact timing is OS dependent. If secs
is zero, TCP keepalive is disabled. TCP keepalive is disabled by default. This can only be set on a connected socket.
fun ref set_keepalive( secs: U32 val) : None val
Parameters
- secs: U32 val
Returns
- None val
write_final
Write as much as possible to the socket. Set _writeable
to false
if not everything was written. On an error, close the connection. This is for data that has already been transformed by the notifier. Data will be silently discarded if the connection has not yet been established though.
fun ref write_final( data: (String val | Array[U8 val] val)) : None val
Parameters
Returns
- None val
close
Attempt to perform a graceful shutdown. Don't accept new writes. If the connection isn't muted then we won't finish closing until we get a zero length read. If the connection is muted, perform a hard close and shut down immediately.
fun ref close() : None val
Returns
- None val
hard_close
When an error happens, do a non-graceful close.
fun ref hard_close() : None val
Returns
- None val
getsockopt
General wrapper for TCP sockets to the getsockopt(2)
system call.
The caller must provide an array that is pre-allocated to be at least as large as the largest data structure that the kernel may return for the requested option.
In case of system call success, this function returns the 2-tuple: 1. The integer 0
. 2. An Array[U8]
of data returned by the system call's void *
4th argument. Its size is specified by the kernel via the system call's sockopt_len_t *
5th argument.
In case of system call failure, this function returns the 2-tuple: 1. The value of errno
. 2. An undefined value that must be ignored.
Usage example:
// connected() is a callback function for class TCPConnectionNotify fun ref connected(conn: TCPConnection ref) => match conn.getsockopt(OSSockOpt.sol_socket(), OSSockOpt.so_rcvbuf(), 4) | (0, let gbytes: Array[U8] iso) => try let br = Reader.create().>append(consume gbytes) ifdef littleendian then let buffer_size = br.u32_le()? else let buffer_size = br.u32_be()? end end | (let errno: U32, _) => // System call failed end
fun ref getsockopt( level: I32 val, option_name: I32 val, option_max_size: USize val = 4) : (U32 val , Array[U8 val] iso^)
Parameters
Returns
getsockopt_u32
Wrapper for TCP sockets to the getsockopt(2)
system call where the kernel's returned option value is a C uint32_t
type / Pony type U32
.
In case of system call success, this function returns the 2-tuple: 1. The integer 0
. 2. The *option_value
returned by the kernel converted to a Pony U32
.
In case of system call failure, this function returns the 2-tuple: 1. The value of errno
. 2. An undefined value that must be ignored.
fun ref getsockopt_u32( level: I32 val, option_name: I32 val) : (U32 val , U32 val)
Parameters
Returns
setsockopt
General wrapper for TCP sockets to the setsockopt(2)
system call.
The caller is responsible for the correct size and byte contents of the option
array for the requested level
and option_name
, including using the appropriate machine endian byte order.
This function returns 0
on success, else the value of errno
on failure.
Usage example:
// connected() is a callback function for class TCPConnectionNotify fun ref connected(conn: TCPConnection ref) => let sb = Writer sb.u32_le(7744) // Our desired socket buffer size let sbytes = Array[U8] for bs in sb.done().values() do sbytes.append(bs) end match conn.setsockopt(OSSockOpt.sol_socket(), OSSockOpt.so_rcvbuf(), sbytes) | 0 => // System call was successful | let errno: U32 => // System call failed end
fun ref setsockopt( level: I32 val, option_name: I32 val, option: Array[U8 val] ref) : U32 val
Parameters
Returns
- U32 val
setsockopt_u32
General wrapper for TCP sockets to the setsockopt(2)
system call where the kernel expects an option value of a C uint32_t
type / Pony type U32
.
This function returns 0
on success, else the value of errno
on failure.
fun ref setsockopt_u32( level: I32 val, option_name: I32 val, option: U32 val) : U32 val
Parameters
Returns
- U32 val
get_so_error
Wrapper for the FFI call getsockopt(fd, SOL_SOCKET, SO_ERROR, ...)
fun ref get_so_error() : (U32 val , U32 val)
Returns
get_so_rcvbuf
Wrapper for the FFI call getsockopt(fd, SOL_SOCKET, SO_RCVBUF, ...)
fun ref get_so_rcvbuf() : (U32 val , U32 val)
Returns
get_so_sndbuf
Wrapper for the FFI call getsockopt(fd, SOL_SOCKET, SO_SNDBUF, ...)
fun ref get_so_sndbuf() : (U32 val , U32 val)
Returns
get_tcp_nodelay
Wrapper for the FFI call getsockopt(fd, SOL_SOCKET, TCP_NODELAY, ...)
fun ref get_tcp_nodelay() : (U32 val , U32 val)
Returns
set_so_rcvbuf
Wrapper for the FFI call setsockopt(fd, SOL_SOCKET, SO_RCVBUF, ...)
fun ref set_so_rcvbuf( bufsize: U32 val) : U32 val
Parameters
- bufsize: U32 val
Returns
- U32 val
set_so_sndbuf
Wrapper for the FFI call setsockopt(fd, SOL_SOCKET, SO_SNDBUF, ...)
fun ref set_so_sndbuf( bufsize: U32 val) : U32 val
Parameters
- bufsize: U32 val
Returns
- U32 val
set_tcp_nodelay
Wrapper for the FFI call setsockopt(fd, SOL_SOCKET, TCP_NODELAY, ...)
fun ref set_tcp_nodelay( state: Bool val) : U32 val
Parameters
- state: Bool val
Returns
- U32 val
© 2016-2020, The Pony Developers
© 2014-2015, Causality Ltd.
Licensed under the BSD 2-Clause License.
https://stdlib.ponylang.io/net-TCPConnection