#include "precompiled.h"

#if OS_WIN
#include "lib/sysdep/win/win.h"
#include "lib/sysdep/win/wposix/wsock_internal.h"
#endif

#include "Network.h"
#include "NetworkInternal.h"

#include "ps/CStr.h"
// ERROR is defined by some windows header. Undef it
#undef ERROR
#include "ps/CLogger.h"
#include "NetLog.h"

#if !OS_WIN
#include <fcntl.h>
#include <signal.h>
#include <sys/ioctl.h>
#endif

// Record global transfer statistics (sent/recvd bytes). This will put a lock
// /unlock pair in all read and write operations.
#define RECORD_GLOBAL_STATS 1

#define GLOBAL_LOCK() pthread_mutex_lock(&g_SocketSetInternal.m_Mutex)
#define GLOBAL_UNLOCK() pthread_mutex_unlock(&g_SocketSetInternal.m_Mutex)

CSocketSetInternal g_SocketSetInternal;

DEFINE_ERROR(NO_SUCH_HOST, "Host not found");
DEFINE_ERROR(CONNECT_TIMEOUT, "The connection attempt timed out");
DEFINE_ERROR(CONNECT_REFUSED, "The connection attempt was refused");
DEFINE_ERROR(NO_ROUTE_TO_HOST, "No route to host");
DEFINE_ERROR(CONNECTION_BROKEN, "The connection has been closed");
DEFINE_ERROR(CONNECT_IN_PROGRESS, "The connect attempt has started, but is not yet complete");
DEFINE_ERROR(PORT_IN_USE, "The port is already in use by another process");
DEFINE_ERROR(INVALID_PORT, "The port specified is either invalid, or forbidden by system or firewall policy");
DEFINE_ERROR(INVALID_PROTOCOL, "The socket type or protocol is not supported by the operating system. Make sure that the TCP/IP protocol is installed and activated");

// Map an OS error number to a PS_RESULT
PS_RESULT GetPS_RESULT(int error)
{
    switch (error)
    {
    case EWOULDBLOCK:
    case EINPROGRESS:
        return PS_OK;
    case ENETUNREACH:
    case ENETDOWN:
    case EADDRNOTAVAIL:
        return NO_ROUTE_TO_HOST;
    case ETIMEDOUT:
        return CONNECT_TIMEOUT;
    case ECONNREFUSED:
        return CONNECT_REFUSED;
    default:
        char buf[256];
        Network_GetErrorString(error, buf, sizeof(buf));
        LOG(ERROR, LOG_CAT_NET, "SocketBase.cpp::GetPS_RESULT(): Unrecognized error %s[%d]", buf, error);
        return PS_FAIL;
    }
}

CSocketAddress::CSocketAddress(int port, ESocketProtocol proto)
{
    memset(&m_Union, 0, sizeof(m_Union));
    switch (proto)
    {
    case IPv4:
        m_Union.m_IPv4.sin_family=PF_INET;
        m_Union.m_IPv4.sin_addr.s_addr=htonl(INADDR_ANY);
        m_Union.m_IPv4.sin_port=htons(port);
        break;
    case IPv6:
        m_Union.m_IPv6.sin6_family=PF_INET6;
        cpu_memcpy(&m_Union.m_IPv6.sin6_addr, &in6addr_any, sizeof(in6addr_any));
        m_Union.m_IPv6.sin6_port=htons(port);
        break;
    default:
        debug_warn("CSocketAddress::CSocketAddress: Bad proto");
    }
}

CSocketAddress CSocketAddress::Loopback(int port, ESocketProtocol proto)
{
    CSocketAddress ret;
    switch (proto)
    {
    case IPv4:
        ret.m_Union.m_IPv4.sin_family=PF_INET;
        ret.m_Union.m_IPv4.sin_addr.s_addr=htonl(INADDR_LOOPBACK);
        ret.m_Union.m_IPv4.sin_port=htons(port);
        break;
    case IPv6:
        ret.m_Union.m_IPv6.sin6_family=PF_INET6;
        cpu_memcpy(&ret.m_Union.m_IPv6.sin6_addr, &in6addr_loopback, sizeof(in6addr_loopback));
        ret.m_Union.m_IPv6.sin6_port=htons(port);
        break;
    default:
        debug_warn("CSocketAddress::CSocketAddress: Bad proto");
    }
    return ret;
}

PS_RESULT CSocketAddress::Resolve(const char *name, int port, CSocketAddress &addr)
{
    addrinfo *ai;
    int res=getaddrinfo(name, NULL, NULL, &ai);
    if (res == 0)
    {
        if (ai->ai_addrlen < sizeof(addr.m_Union))
            cpu_memcpy(&addr.m_Union, ai->ai_addr, ai->ai_addrlen);
        switch (addr.m_Union.m_Family)
        {
        case IPv4:
            addr.m_Union.m_IPv4.sin_port=htons(port);
            break;
        case IPv6:
            addr.m_Union.m_IPv6.sin6_port=htons(port);
            break;
        }
        freeaddrinfo(ai);
        return PS_OK;
    }
    // supported, but failed
    if (errno != ENOSYS)
        return NO_SUCH_HOST;
    // else: IPv6 not supported, fall back to IPv4

    hostent *he;
 
    addr.m_Union.m_IPv4.sin_family=AF_INET;
    addr.m_Union.m_IPv4.sin_port=htons(port);
    // Try to parse dot-notation IP
    addr.m_Union.m_IPv4.sin_addr.s_addr=inet_addr(name);
    if (addr.m_Union.m_IPv4.sin_addr.s_addr==INADDR_NONE) // Not a dotted IP, try name resolution
    {
        he=gethostbyname(name);
        if (!he)
            return NO_SUCH_HOST;
        addr.m_Union.m_IPv4.sin_addr=*(struct in_addr *)(he->h_addr_list[0]);
    }
    return PS_OK;
}

CStr CSocketAddress::GetString() const
{
    char convBuf[NI_MAXHOST];
    int res=getnameinfo((sockaddr *)&m_Union, sizeof(m_Union), convBuf, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
    if (res == 0)
        return CStr(convBuf);
    // getnameinfo won't return a string for the IPv6 unspecified address
    else if (m_Union.m_Family == IPv6 && res==EAI_NONAME)
        return "::";
    // supported, but failed
    else if (errno != ENOSYS)
        return "";
    // else: IPv6 not supported, fall back to IPv4

    if (m_Union.m_Family == IPv4)
    {
        sprintf(convBuf, "%d.%d.%d.%d",
            m_Union.m_IPv4.sin_addr.s_addr&0xff,
            (m_Union.m_IPv4.sin_addr.s_addr>>8)&0xff,
            (m_Union.m_IPv4.sin_addr.s_addr>>16)&0xff,
            (m_Union.m_IPv4.sin_addr.s_addr>>24)&0xff);
        return CStr(convBuf);
    }
    else
        return CStr();
}

int CSocketAddress::GetPort() const
{
    switch (m_Union.m_Family)
    {
    case IPv4:
    case IPv6:
        return ntohs(m_Union.m_IPv4.sin_port);
    }
    return -1;
}

CSocketBase::CSocketBase()
{
    m_pInternal=new CSocketInternal;
    m_Proto=UNSPEC;
    m_NonBlocking=true;
    m_State=SS_UNCONNECTED;
    m_Error=PS_OK;
}

CSocketBase::CSocketBase(CSocketInternal *pInt)
{
    m_pInternal=pInt;
    m_Proto=pInt->m_RemoteAddr.GetProtocol();
    m_State=SS_CONNECTED;
    m_Error=PS_OK;
    SetNonBlocking(true);
    
    NET_LOG("CSocketBase::CSocketBase(): Created socket from fd %d", pInt->m_fd);
}

CSocketBase::~CSocketBase()
{
    NET_LOG("CSocketBase::~CSocketBase(): fd is %d. "
        "Received: %lld bytes. Sent: %lld bytes.", m_pInternal->m_fd,
        m_pInternal->m_RecvBytes, m_pInternal->m_SentBytes);

    Destroy();
    delete m_pInternal;
}

void CSocketBase::Shutdown()
{
    GLOBAL_LOCK();
    
    if (g_SocketSetInternal.m_NumSockets)
    {
        NET_LOG("CSocketBase::Shutdown(): %d sockets still open! (forcing network shutdown)", g_SocketSetInternal.m_NumSockets);
    }

#if RECORD_GLOBAL_STATS
    NET_LOG("GLOBAL SOCKET STATISTICS: "
        "Received: %lld bytes. Sent: %lld bytes.",
        g_SocketSetInternal.m_GlobalRecvBytes,
        g_SocketSetInternal.m_GlobalSentBytes);
#endif

    GLOBAL_UNLOCK();
    if (g_SocketSetInternal.m_Thread)
    {
        AbortWaitLoop();
        pthread_join(g_SocketSetInternal.m_Thread, NULL);
    }
}

void *WaitLoopThreadMain(void *)
{
    debug_set_thread_name("net_wait");

    GLOBAL_LOCK();
    CSocketBase::RunWaitLoop();
    
    g_SocketSetInternal.m_Thread=0;
    
    GLOBAL_UNLOCK();
    return NULL;
}

PS_RESULT CSocketBase::Initialize(ESocketProtocol proto)
{
    int res=socket(proto, SOCK_STREAM, 0);

    NET_LOG("CSocketBase::Initialize(): socket() res: %d", res);

    if (res == -1)
    {
        return INVALID_PROTOCOL;
    }
    
    m_pInternal->m_fd=res;
    m_Proto=proto;

    GLOBAL_LOCK();
    if (g_SocketSetInternal.m_NumSockets == 0)
        pthread_create(&g_SocketSetInternal.m_Thread, NULL, WaitLoopThreadMain, NULL);
    g_SocketSetInternal.m_NumSockets++;
    GLOBAL_UNLOCK();

    SetNonBlocking(m_NonBlocking);

    return PS_OK;
}

void CSocketBase::Close()
{
    shutdown(m_pInternal->m_fd, SHUT_WR);
    m_State=SS_CLOSED_LOCALLY;
}

void CSocketBase::Destroy()
{
    if (m_pInternal->m_fd == -1)
    {
        m_State=SS_UNCONNECTED;
        return;
    }

    // Remove any data associated with the file descriptor
    GLOBAL_LOCK();
    debug_assert(g_SocketSetInternal.m_NumSockets > 0);

    g_SocketSetInternal.m_NumSockets--;
    g_SocketSetInternal.m_HandleMap.erase(m_pInternal->m_fd);

    if (!g_SocketSetInternal.m_NumSockets)
        AbortWaitLoop();
    GLOBAL_UNLOCK();

    // Disconnect the socket, if it is still connected
    if (m_State == SS_CONNECTED || m_State == SS_CLOSED_LOCALLY)
    {
        // This makes the other end receive a RST, but since
        // we've had no chance to close cleanly and the socket must
        // be destroyed immediately, we've got no choice
        shutdown(m_pInternal->m_fd, SHUT_RDWR);
        m_State=SS_UNCONNECTED;
    }
    // Destroy the socket
    closesocket(m_pInternal->m_fd);
    m_pInternal->m_fd=-1;
}

void CSocketBase::SetNonBlocking(bool nonblocking)
{
    m_NonBlocking=nonblocking;
#if OS_WIN
    unsigned long nb=nonblocking;
    if(!nonblocking)
        SendWaitLoopUpdate();   // Need to call WSAAsyncSelect with event=0 before ioctlsocket
    int res=ioctlsocket(m_pInternal->m_fd, FIONBIO, &nb);
    if (res != 0)
        NET_LOG("SetNonBlocking: res %d", res);
#else
    int oldflags=fcntl(m_pInternal->m_fd, F_GETFL, 0);
    if (oldflags != -1)
    {
        if (nonblocking)
            oldflags |= O_NONBLOCK;
        else
            oldflags &= ~O_NONBLOCK;
        fcntl(m_pInternal->m_fd, F_SETFL, oldflags);
    }
#endif
}

void CSocketBase::SetTcpNoDelay(bool tcpNoDelay)
{
    // Disable Nagle's Algorithm
    int data=tcpNoDelay;
    setsockopt(m_pInternal->m_fd, SOL_SOCKET, TCP_NODELAY, (const char *)&data, sizeof(data));
}

PS_RESULT CSocketBase::Read(void *buf, uint len, uint *bytesRead)
{
    int res;
    char errbuf[256];
    
    res=recv(m_pInternal->m_fd, (char *)buf, len, 0);
    if (res < 0)
    {
        *bytesRead=0;
        int error=Network_LastError;
        switch (error)
        {
        case EWOULDBLOCK:
            return PS_OK;
        /*case ENETDOWN:
        case ENETRESET:
        case ENOTCONN:
        case ESHUTDOWN:
        case ECONNABORTED:
        case ECONNRESET:
        case ETIMEDOUT:*/
        default:
            Network_GetErrorString(error, errbuf, sizeof(errbuf));
            NET_LOG("Read error %s [%d]", errbuf, error);
            m_State=SS_UNCONNECTED;
            m_Error=GetPS_RESULT(error);
            return m_Error;
        }
    }

    if (res == 0 && len > 0) // EOF - Cleanly closed socket
    {
        *bytesRead=0;
        m_State=SS_UNCONNECTED;
        m_Error=PS_OK;
        return CONNECTION_BROKEN;
    }
    
    *bytesRead=res;
    
    m_pInternal->m_RecvBytes += res;
#if RECORD_GLOBAL_STATS
    GLOBAL_LOCK();
    g_SocketSetInternal.m_GlobalRecvBytes += res;
    GLOBAL_UNLOCK();
#endif
    
    return PS_OK;
}

PS_RESULT CSocketBase::Write(void *buf, uint len, uint *bytesWritten)
{
    int res;
    char errbuf[256];

    res=send(m_pInternal->m_fd, (char *)buf, len, 0);
    if (res < 0)
    {
        *bytesWritten=0;
        int err=Network_LastError;
        switch (err)
        {
        case EWOULDBLOCK:
            return PS_OK;
        /*case ENETDOWN:
        case ENETRESET:
        case ENOTCONN:
        case ESHUTDOWN:
        case ECONNABORTED:
        case ECONNRESET:
        case ETIMEDOUT:
        case EHOSTUNREACH:*/
        default:
            Network_GetErrorString(err, errbuf, sizeof(errbuf));
            NET_LOG("Write error %s [%d]", errbuf, err);
            m_State=SS_UNCONNECTED;
            return CONNECTION_BROKEN;
        }
    }

    *bytesWritten=res;

    m_pInternal->m_SentBytes += res;
#if RECORD_GLOBAL_STATS
    GLOBAL_LOCK();
    g_SocketSetInternal.m_GlobalSentBytes += res;
    GLOBAL_UNLOCK();
#endif
    
    return PS_OK;
}

PS_RESULT CSocketBase::Connect(const CSocketAddress &addr)
{
    int res = connect(m_pInternal->m_fd, (struct sockaddr *)(&addr.m_Union), sizeof(addr));
    NET_LOG("connect returned %d [%d]", res, m_NonBlocking);

    if (res != 0)
    {
        int error=Network_LastError;
        NET_LOG("last error was %d", error);
        if (m_NonBlocking && error == EWOULDBLOCK)
        {
            m_State=SS_CONNECT_STARTED;
        }
        else
        {
            m_State=SS_UNCONNECTED;
            m_Error=GetPS_RESULT(error);
        }
    }
    else
    {
        m_State=SS_CONNECTED;
        m_Error=PS_OK;
    }

    return m_Error;
}

PS_RESULT CSocketBase::Bind(const CSocketAddress &address)
{
    char errBuf[256];
    int res;

    Initialize(address.GetProtocol());

    SetOpMask(READ);

    res=bind(m_pInternal->m_fd, (struct sockaddr *)&address, sizeof(address));
    if (res == -1)
    {
        PS_RESULT ret=PS_FAIL;
        int err=Network_LastError;
        switch (err)
        {
            case EADDRINUSE:
                ret=PORT_IN_USE;
                break;
            case EACCES:
            case EADDRNOTAVAIL:
                ret=INVALID_PORT;
                break;
            default:
                Network_GetErrorString(err, errBuf, sizeof(errBuf));
                LOG(ERROR, LOG_CAT_NET, "CServerSocket::Bind(): bind: %s [%d] => PS_FAIL", errBuf, err);
        }
        m_State=SS_UNCONNECTED;
        m_Error=ret;
        return ret;
    }

    res=listen(m_pInternal->m_fd, 5);
    if (res == -1)
    {
        int err=Network_LastError;
        Network_GetErrorString(err, errBuf, sizeof(errBuf));
        LOG(ERROR, LOG_CAT_NET, "CServerSocket::Bind(): listen: %s [%d] => PS_FAIL", errBuf, err);
        m_State=SS_UNCONNECTED;
        return PS_FAIL;
    }

    m_State=SS_CONNECTED;
    m_Error=PS_OK;
    return PS_OK;
}

PS_RESULT CSocketBase::PreAccept(CSocketAddress &addr)
{
    socklen_t addrLen=sizeof(addr.m_Union);
    int fd=accept(m_pInternal->m_fd, (struct sockaddr *)&addr.m_Union, &addrLen);
    m_pInternal->m_AcceptFd=fd;
    m_pInternal->m_AcceptAddr=addr;
    if (fd != -1)
        return PS_OK;
    else
    {
        PS_RESULT res=GetPS_RESULT(Network_LastError);
        // GetPS_RESULT considers some errors non-failures
        if (res == PS_OK)
            return PS_FAIL;
        else
            return res;
    }
}

CSocketInternal *CSocketBase::Accept()
{
    if (m_pInternal->m_AcceptFd != -1)
    {
        CSocketInternal *pInt=new CSocketInternal();
        pInt->m_fd=m_pInternal->m_AcceptFd;
        pInt->m_RemoteAddr=m_pInternal->m_AcceptAddr;
        
        GLOBAL_LOCK();
        g_SocketSetInternal.m_NumSockets++;     
        GLOBAL_UNLOCK();

        m_pInternal->m_AcceptFd=-1;
        return pInt;
    }
    else
        return NULL;
}

void CSocketBase::Reject()
{
    shutdown(m_pInternal->m_AcceptFd, SHUT_RDWR);
    closesocket(m_pInternal->m_AcceptFd);
}

// UNIX select loop
#if !OS_WIN
// ConnectError is called on a socket the first time it selects as ready
// after the BeginConnect, to check errors on the socket and update the
// connection status information
//
// Returns: true if error callback should be called, false if it should not
bool CSocketBase::ConnectError(CSocketBase *pSocket)
{
    CSocketInternal *pInt=pSocket->m_pInternal;
    uint buf;
    int res;
    
    if (pSocket->m_State==SS_CONNECT_STARTED)
    {
        res=read(pInt->m_fd, &buf, 0);
        // read of zero bytes should be a successful no-op, unless
        // there was an error
        if (res == -1)
        {
            pSocket->m_State=SS_UNCONNECTED;
            PS_RESULT connErr=GetPS_RESULT(errno);
            NET_LOG("Connect error: %s [%d:%s]", connErr, errno, strerror(errno));
            pSocket->m_Error=connErr;
            return true;
        }
        else
        {
            pSocket->m_State=SS_CONNECTED;
            pSocket->m_Error=PS_OK;
        }
    }
    
    return false;
}

// SocketWritable is called whenever a socket selects as writable in the unix
// select loop. This will call the callback after checking for a connect error
// if there's a connect in progress, as well as update the socket's state.
//
// Locking: The global mutex must be held when entering this function, and it
// will be held upon return.
void CSocketBase::SocketWritable(CSocketBase *pSock)
{
    //CSocketInternal *pInt=pSock->m_pInternal;
    bool isConnectError=false;

    if (pSock->m_State != SS_CONNECTED)
        isConnectError=ConnectError(pSock);

    GLOBAL_UNLOCK();

    if (isConnectError)
        pSock->OnClose(pSock->m_Error);
    else
        pSock->OnWrite();

    GLOBAL_LOCK();
}

// SocketReadable is called whenever a socket selects as writable in the unix
// select loop. This will call the callback after checking for a connect error
// if there's a connect in progress, as well as update the socket's state.
//
// Locking: The global mutex must be held when entering this function, and it
// will be held upon return.
void CSocketBase::SocketReadable(CSocketBase *pSock)
{
    bool isError=false;

    if (pSock->m_State == SS_CONNECT_STARTED)
        isError=ConnectError(pSock);
    else if (pSock->m_State == SS_UNCONNECTED)
    {
        // UNCONNECTED sockets don't get callbacks
        // Note that server sockets that are bound have state==SS_CONNECTED
        return;
    }
    else if (pSock->m_State != SS_UNCONNECTED)
    {
        uint nRead;
        errno=0;
        int res=ioctl(pSock->m_pInternal->m_fd, FIONREAD, &nRead);
        // failure, errno=EINVAL means server socket
        // success, nRead != 0 means alive stream socket
        if ((res == -1 && errno != EINVAL) ||
            (res == 0 && nRead == 0))
        {
            NET_LOG("RunWaitLoop:ioctl: Connection broken [%d:%s]", errno, strerror(errno));
            // Don't use API function - we both hold a lock and
            // it is unnecessary to SendWaitLoopUpdate at this
            // stage
            pSock->m_pInternal->m_Ops=0;
            pSock->m_State=SS_UNCONNECTED;
            if (errno)
                pSock->m_Error=GetPS_RESULT(errno);
            else
                pSock->m_Error=PS_OK;
            isError=true;
        }
    }

    GLOBAL_UNLOCK();

    if (isError)
        pSock->OnClose(pSock->m_Error);
    else
        pSock->OnRead();

    GLOBAL_LOCK();
}

void CSocketBase::RunWaitLoop()
{
    int res;

    signal(SIGPIPE, SIG_IGN);

    // Create Control Pipe
    res=pipe(g_SocketSetInternal.m_Pipe);
    if (res != 0)
        return;
    
    // The lock is held upon entry and exit of this loop. There are a few places
    // where the lock is released and then re-acquired: when calling callbacks
    // and when calling select().
    while (true)
    {
        std::map<int, CSocketBase *>::iterator it;
        fd_set rfds;
        fd_set wfds;
        int fd_max=g_SocketSetInternal.m_Pipe[0];

        // Prepare fd_set: Read
        FD_ZERO(&rfds);
        FD_SET(g_SocketSetInternal.m_Pipe[0], &rfds);

        // Prepare fd_set: Write
        FD_ZERO(&wfds);

        it=g_SocketSetInternal.m_HandleMap.begin();
        while (it != g_SocketSetInternal.m_HandleMap.end())
        {
            uint ops=it->second->m_pInternal->m_Ops;

            if (ops && it->first > fd_max)
                fd_max=it->first;
            if (ops & READ)
                FD_SET(it->first, &rfds);
            if (ops & WRITE)
                FD_SET(it->first, &wfds);

            ++it;
        }
        GLOBAL_UNLOCK();

        // select, timeout infinite
        res=select(fd_max+1, &rfds, &wfds, NULL, NULL);

        GLOBAL_LOCK();
        // Check select error
        if (res == -1)
        {
            // It is possible for a socket to be deleted between preparing the
            // fd_sets and actually performing the select - in which case it
            // will fire a Bad file descriptor error. Simply retry.
            if (Network_LastError == EBADF)
                continue;
            perror("CSocketSet::RunWaitLoop(), select");
            continue;
        }

        // Check Control Pipe
        if (FD_ISSET(g_SocketSetInternal.m_Pipe[0], &rfds))
        {
            char bt;
            if (read(g_SocketSetInternal.m_Pipe[0], &bt, 1) == 1)
            {
                if (bt=='q')
                    break;
                else if (bt=='r')
                {
                    // Reload sockets - just skip to the beginning of the loop
                    continue;
                }
            }
            
            FD_CLR(g_SocketSetInternal.m_Pipe[0], &rfds);
        }

        // Go through sockets
        int i=-1;
        while (++i <= fd_max)
        {
            if (!FD_ISSET(i, &rfds) && !FD_ISSET(i, &wfds))
                continue;
            
            it=g_SocketSetInternal.m_HandleMap.find(i);
            if (it == g_SocketSetInternal.m_HandleMap.end())
                continue;
            
            CSocketBase *pSock=it->second;
            
            if (FD_ISSET(i, &wfds))
            {
                SocketWritable(pSock);
            }

            // After the callback is called, we must check if the socket
            // still exists (sockets may delete themselves in the callback)
            it=g_SocketSetInternal.m_HandleMap.find(i);
            if (it == g_SocketSetInternal.m_HandleMap.end())
                continue;

            if (FD_ISSET(i, &rfds))
            {
                SocketReadable(pSock);
            }
        }
    }
    
    // Close control pipe
    close(g_SocketSetInternal.m_Pipe[0]);
    close(g_SocketSetInternal.m_Pipe[1]);

    return;
}

void CSocketBase::SendWaitLoopAbort()
{
    char msg='q';
    write(g_SocketSetInternal.m_Pipe[1], &msg, 1);
}

void CSocketBase::SendWaitLoopUpdate()
{
//  NET_LOG("SendWaitLoopUpdate: fd %d, ops %u\n", m_pInternal->m_fd, m_pInternal->m_Ops);
    char msg='r';
    write(g_SocketSetInternal.m_Pipe[1], &msg, 1);
}

// Windows WindowProc for async event notification
#else   // i.e. #if OS_WIN

void WaitLoop_SocketUpdateProc(int fd, int error, uint event)
{
    GLOBAL_LOCK();
    CSocketBase *pSock=g_SocketSetInternal.m_HandleMap[fd];
    GLOBAL_UNLOCK();
    
    // FIXME What if the fd isn't in the handle map?

    if (error)
    {
        PS_RESULT res=GetPS_RESULT(error);
        pSock->m_Error=res;
        pSock->m_State=SS_UNCONNECTED;
        if (res == PS_FAIL)
            pSock->OnClose(CONNECTION_BROKEN);
        return;
    }

    if (pSock->m_State==SS_CONNECT_STARTED)
    {
        pSock->m_Error=PS_OK;
        pSock->m_State=SS_CONNECTED;
    }

    switch (event)
    {
    case FD_ACCEPT:
    case FD_READ:
        pSock->OnRead();
        break;
    case FD_CONNECT:
    case FD_WRITE:
        pSock->OnWrite();
        break;
    case FD_CLOSE:
        // If FD_CLOSE and error, OnClose has already been called above
        // with the appropriate PS_RESULT
        pSock->m_State=SS_UNCONNECTED;
        pSock->OnClose(PS_OK);
        break;
    }
}

LRESULT WINAPI WaitLoop_WindowProc(HWND hWnd, UINT msg, WPARAM wParam, LPARAM lParam)
{
    //printf("WaitLoop_WindowProc(): Windows message: %d:%d:%d\n", msg, wParam, lParam);
    switch (msg)
    {
        case MSG_SOCKET_READY:
        {
            int event=LOWORD(lParam);
            int error=HIWORD(lParam);
            
            WaitLoop_SocketUpdateProc((int)wParam, error?error-WSABASEERR:0, event);
            return FALSE;
        }
        default:
            return DefWindowProc(hWnd, msg, wParam, lParam);
    }
}

// Locked on entry, must be locked on exit
void CSocketBase::RunWaitLoop()
{
    int ret;
    char errBuf[256];
    MSG msg;

    WNDCLASS wc;
    ATOM atom;

    memset(&wc, 0, sizeof(WNDCLASS));
    wc.lpszClassName="Network Event WindowClass";
    wc.lpfnWndProc=WaitLoop_WindowProc;

    atom=RegisterClass(&wc);
    if (!atom)
    {
        ret=GetLastError();
        Network_GetErrorString(ret, errBuf, sizeof(errBuf));
        NET_LOG("RegisterClass: %s [%d]", errBuf, ret);
        return;
    }

    // Create message window
    g_SocketSetInternal.m_hWnd=CreateWindow((LPCTSTR)atom, "Network Event Window", WS_POPUP, 0, 0, 0, 0, NULL, NULL, NULL, NULL);

    if (!g_SocketSetInternal.m_hWnd)
    {
        ret=GetLastError();
        Network_GetErrorString(ret, errBuf, sizeof(errBuf));
        NET_LOG("CreateWindowEx: %s [%d]", errBuf, ret);
        return;
    }
    
    // If OpMasks where set in another thread before we got this far,
    // WSAAsyncSelect will need to be called again
    std::map<int, CSocketBase *>::iterator it;
    it=g_SocketSetInternal.m_HandleMap.begin();
    while (it != g_SocketSetInternal.m_HandleMap.end())
    {
        it->second->SetOpMask(it->second->GetOpMask());
        ++it;
    }

    NET_LOG("Commencing message loop. hWnd %p", g_SocketSetInternal.m_hWnd);
    GLOBAL_UNLOCK();
    
    while ((ret=GetMessage(&msg, g_SocketSetInternal.m_hWnd, 0, 0))!=0)
    {
        //printf("RunWaitLoop(): Windows message: %d:%d:%d\n", msg.message, msg.wParam, msg.lParam);
        if (ret == -1)
        {
            ret=GetLastError();
            Network_GetErrorString(ret, errBuf, sizeof(errBuf));
            NET_LOG("GetMessage: %s [%d]", errBuf, ret);
        }
        {
            TranslateMessage(&msg);
            DispatchMessage(&msg);
        }
    }

    GLOBAL_LOCK();
    g_SocketSetInternal.m_Thread=0;
    // FIXME Leak: Destroy window
    g_SocketSetInternal.m_hWnd=0;

    NET_LOG("RunWaitLoop returning");
    return;
}

void CSocketBase::SendWaitLoopAbort()
{
    if (g_SocketSetInternal.m_hWnd)
    {
        PostMessage(g_SocketSetInternal.m_hWnd, WM_QUIT, 0, 0);
    }
    else
        NET_LOG("SendWaitLoopUpdate: No WaitLoop Running.");
}

void CSocketBase::SendWaitLoopUpdate()
{
    GLOBAL_LOCK();
    if (g_SocketSetInternal.m_hWnd)
    {
        if(m_NonBlocking == false)
        {
            GLOBAL_UNLOCK();
            WSAAsyncSelect(m_pInternal->m_fd, g_SocketSetInternal.m_hWnd, MSG_SOCKET_READY, 0);
            return;
        }

        long wsaOps=FD_CLOSE;
        if (m_pInternal->m_Ops & READ)
            wsaOps |= FD_READ|FD_ACCEPT;
        if (m_pInternal->m_Ops & WRITE)
            wsaOps |= FD_WRITE|FD_CONNECT;
        GLOBAL_UNLOCK();
        //printf("SendWaitLoopUpdate: %d: %u %x -> %p\n", m_pInternal->m_fd, m_pInternal->m_Ops, wsaOps, g_SocketSetInternal.m_hWnd);
        WSAAsyncSelect(m_pInternal->m_fd, g_SocketSetInternal.m_hWnd, MSG_SOCKET_READY, wsaOps);
    }
    else
    {
        //printf("SendWaitLoopUpdate: No WaitLoop Running.\n");
        GLOBAL_UNLOCK();
    }
}
#endif  // #if OS_WIN

void CSocketBase::AbortWaitLoop()
{
    SendWaitLoopAbort();
//  pthread_join(g_SocketSetInternal.m_Thread);
}

uint CSocketBase::GetOpMask()
{
    return m_pInternal->m_Ops;
}

void CSocketBase::SetOpMask(uint ops)
{
    GLOBAL_LOCK();
    g_SocketSetInternal.m_HandleMap[m_pInternal->m_fd]=this;
    m_pInternal->m_Ops=ops;
    
    /*printf("SetOpMask(fd %d, ops %u) %u\n",
        m_pInternal->m_fd,
        ops,
        g_SocketSetInternal.m_HandleMap[m_pInternal->m_fd]->m_pInternal->m_Ops);*/

    GLOBAL_UNLOCK();
    
    SendWaitLoopUpdate();
}