diff options
author | ian <ian@138bc75d-0d04-0410-961f-82ee72b054a4> | 2011-01-21 18:19:03 +0000 |
---|---|---|
committer | ian <ian@138bc75d-0d04-0410-961f-82ee72b054a4> | 2011-01-21 18:19:03 +0000 |
commit | 48080209fa53b6ea88c86e9f445c431b4cd1e47b (patch) | |
tree | 27d8768fb1d25696d3c40b42535eb5e073c278da /libgo/go/netchan/common.go | |
parent | bff898fbbe4358a4b7e337852df4d6043e0bd3f5 (diff) | |
download | ppe42-gcc-48080209fa53b6ea88c86e9f445c431b4cd1e47b.tar.gz ppe42-gcc-48080209fa53b6ea88c86e9f445c431b4cd1e47b.zip |
Remove the types float and complex.
Update to current version of Go library.
Update testsuite for removed types.
* go-lang.c (go_langhook_init): Omit float_type_size when calling
go_create_gogo.
* go-c.h: Update declaration of go_create_gogo.
git-svn-id: svn+ssh://gcc.gnu.org/svn/gcc/trunk@169098 138bc75d-0d04-0410-961f-82ee72b054a4
Diffstat (limited to 'libgo/go/netchan/common.go')
-rw-r--r-- | libgo/go/netchan/common.go | 154 |
1 files changed, 144 insertions, 10 deletions
diff --git a/libgo/go/netchan/common.go b/libgo/go/netchan/common.go index 87981ca8603..56c0b25199f 100644 --- a/libgo/go/netchan/common.go +++ b/libgo/go/netchan/common.go @@ -38,27 +38,30 @@ const ( payData // user payload follows payAck // acknowledgement; no payload payClosed // channel is now closed + payAckSend // payload has been delivered. ) // A header is sent as a prefix to every transmission. It will be followed by // a request structure, an error structure, or an arbitrary user payload structure. type header struct { - name string - payloadType int - seqNum int64 + Id int + PayloadType int + SeqNum int64 } // Sent with a header once per channel from importer to exporter to report // that it wants to bind to a channel with the specified direction for count -// messages. If count is -1, it means unlimited. +// messages, with space for size buffered values. If count is -1, it means unlimited. type request struct { - count int64 - dir Dir + Name string + Count int64 + Size int + Dir Dir } // Sent with a header to report an error. type error struct { - error string + Error string } // Used to unify management of acknowledgements for import and export. @@ -78,7 +81,7 @@ type chanDir struct { // clients of an exporter and draining outstanding messages. type clientSet struct { mu sync.Mutex // protects access to channel and client maps - chans map[string]*chanDir + names map[string]*chanDir clients map[unackedCounter]bool } @@ -111,7 +114,7 @@ func (ed *encDec) decode(value reflect.Value) os.Error { // Encode a header and payload onto the connection. func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error { ed.encLock.Lock() - hdr.payloadType = payloadType + hdr.PayloadType = payloadType err := ed.enc.Encode(hdr) if err == nil { if payload != nil { @@ -132,7 +135,7 @@ func (cs *clientSet) drain(timeout int64) os.Error { pending := false cs.mu.Lock() // Any messages waiting for a client? - for _, chDir := range cs.chans { + for _, chDir := range cs.names { if chDir.ch.Len() > 0 { pending = true } @@ -189,3 +192,134 @@ func (cs *clientSet) sync(timeout int64) os.Error { } return nil } + +// A netChan represents a channel imported or exported +// on a single connection. Flow is controlled by the receiving +// side by sending payAckSend messages when values +// are delivered into the local channel. +type netChan struct { + *chanDir + name string + id int + size int // buffer size of channel. + + // sender-specific state + ackCh chan bool // buffered with space for all the acks we need + space int // available space. + + // receiver-specific state + sendCh chan reflect.Value // buffered channel of values received from other end. + ed *encDec // so that we can send acks. + count int64 // number of values still to receive. +} + +// Create a new netChan with the given name (only used for +// messages), id, direction, buffer size, and count. +// The connection to the other side is represented by ed. +func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan { + c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count} + if c.dir == Send { + c.ackCh = make(chan bool, size) + c.space = size + } + return c +} + +// Close the channel. +func (nch *netChan) close() { + if nch.dir == Recv { + if nch.sendCh != nil { + // If the sender goroutine is active, close the channel to it. + // It will close nch.ch when it can. + close(nch.sendCh) + } else { + nch.ch.Close() + } + } else { + nch.ch.Close() + close(nch.ackCh) + } +} + +// Send message from remote side to local receiver. +func (nch *netChan) send(val reflect.Value) { + if nch.dir != Recv { + panic("send on wrong direction of channel") + } + if nch.sendCh == nil { + // If possible, do local send directly and ack immediately. + if nch.ch.TrySend(val) { + nch.sendAck() + return + } + // Start sender goroutine to manage delayed delivery of values. + nch.sendCh = make(chan reflect.Value, nch.size) + go nch.sender() + } + if ok := nch.sendCh <- val; !ok { + // TODO: should this be more resilient? + panic("netchan: remote sender sent more values than allowed") + } +} + +// sendAck sends an acknowledgment that a message has left +// the channel's buffer. If the messages remaining to be sent +// will fit in the channel's buffer, then we don't +// need to send an ack. +func (nch *netChan) sendAck() { + if nch.count < 0 || nch.count > int64(nch.size) { + nch.ed.encode(&header{Id: nch.id}, payAckSend, nil) + } + if nch.count > 0 { + nch.count-- + } +} + +// The sender process forwards items from the sending queue +// to the destination channel, acknowledging each item. +func (nch *netChan) sender() { + if nch.dir != Recv { + panic("sender on wrong direction of channel") + } + // When Exporter.Hangup is called, the underlying channel is closed, + // and so we may get a "too many operations on closed channel" error + // if there are outstanding messages in sendCh. + // Make sure that this doesn't panic the whole program. + defer func() { + if r := recover(); r != nil { + // TODO check that r is "too many operations", otherwise re-panic. + } + }() + for v := range nch.sendCh { + nch.ch.Send(v) + nch.sendAck() + } + nch.ch.Close() +} + +// Receive value from local side for sending to remote side. +func (nch *netChan) recv() (val reflect.Value, closed bool) { + if nch.dir != Send { + panic("recv on wrong direction of channel") + } + + if nch.space == 0 { + // Wait for buffer space. + <-nch.ackCh + nch.space++ + } + nch.space-- + return nch.ch.Recv(), nch.ch.Closed() +} + +// acked is called when the remote side indicates that +// a value has been delivered. +func (nch *netChan) acked() { + if nch.dir != Send { + panic("recv on wrong direction of channel") + } + if ok := nch.ackCh <- true; !ok { + panic("netchan: remote receiver sent too many acks") + // TODO: should this be more resilient? + } +} |