Commit f3b6c7fcf8fca857b3c3ba0aa5b3a06d7ce0ac37

Authored by Mark McLoughlin
1 parent e94667b9

net: add qemu_send_packet_async()

Add a qemu_send_packet() variant which will queue up the packet
if it cannot be sent when all client queues are full. It later
invokes the supplied callback when the packet has been sent.

If qemu_send_packet_async() returns zero, the caller is expected
to not send any more packets until the queued packet has been
sent.

Packets are queued iff a receive() handler returns zero (indicating
queue full) and the caller has provided a sent notification callback
(indicating it will stop and start its own queue).

We need the packet sending API to support queueing because:

  - a sending client should process all available packets in one go
    (e.g. virtio-net emptying its tx ring)

  - a receiving client may not be able to handle the packet
    (e.g. -EAGAIN from write() to tapfd)

  - the sending client could detect this condition in advance
    (e.g. by select() for writable on tapfd)

  - that's too much overhead (e.g. a select() call per packet)

  - therefore the sending client must handle the condition by
    dropping the packet or queueing it

  - dropping packets is poor form; we should queue.

However, we don't want queueing to be completely transparent. We
want the sending client to stop sending packets as soon as a
packet is queued. This allows the sending client to be throttled
by the receiver.

Signed-off-by: Mark McLoughlin <markmc@redhat.com>
Showing 2 changed files with 70 additions and 21 deletions
@@ -439,19 +439,32 @@ qemu_deliver_packet(VLANClientState *sender, const uint8_t *buf, int size) @@ -439,19 +439,32 @@ qemu_deliver_packet(VLANClientState *sender, const uint8_t *buf, int size)
439 return ret; 439 return ret;
440 } 440 }
441 441
442 -static void qemu_flush_queued_packets(VLANClientState *vc) 442 +void qemu_flush_queued_packets(VLANClientState *vc)
443 { 443 {
444 VLANPacket *packet; 444 VLANPacket *packet;
445 445
446 while ((packet = vc->vlan->send_queue) != NULL) { 446 while ((packet = vc->vlan->send_queue) != NULL) {
  447 + int ret;
  448 +
447 vc->vlan->send_queue = packet->next; 449 vc->vlan->send_queue = packet->next;
448 - qemu_deliver_packet(packet->sender, packet->data, packet->size); 450 +
  451 + ret = qemu_deliver_packet(packet->sender, packet->data, packet->size);
  452 + if (ret == 0 && packet->sent_cb != NULL) {
  453 + packet->next = vc->vlan->send_queue;
  454 + vc->vlan->send_queue = packet;
  455 + break;
  456 + }
  457 +
  458 + if (packet->sent_cb)
  459 + packet->sent_cb(packet->sender);
  460 +
449 qemu_free(packet); 461 qemu_free(packet);
450 } 462 }
451 } 463 }
452 464
453 -static void  
454 -qemu_enqueue_packet(VLANClientState *sender, const uint8_t *buf, int size) 465 +static void qemu_enqueue_packet(VLANClientState *sender,
  466 + const uint8_t *buf, int size,
  467 + NetPacketSent *sent_cb)
455 { 468 {
456 VLANPacket *packet; 469 VLANPacket *packet;
457 470
@@ -459,28 +472,45 @@ qemu_enqueue_packet(VLANClientState *sender, const uint8_t *buf, int size) @@ -459,28 +472,45 @@ qemu_enqueue_packet(VLANClientState *sender, const uint8_t *buf, int size)
459 packet->next = sender->vlan->send_queue; 472 packet->next = sender->vlan->send_queue;
460 packet->sender = sender; 473 packet->sender = sender;
461 packet->size = size; 474 packet->size = size;
  475 + packet->sent_cb = sent_cb;
462 memcpy(packet->data, buf, size); 476 memcpy(packet->data, buf, size);
463 sender->vlan->send_queue = packet; 477 sender->vlan->send_queue = packet;
464 } 478 }
465 479
466 -void qemu_send_packet(VLANClientState *vc, const uint8_t *buf, int size) 480 +ssize_t qemu_send_packet_async(VLANClientState *sender,
  481 + const uint8_t *buf, int size,
  482 + NetPacketSent *sent_cb)
467 { 483 {
468 - VLANState *vlan = vc->vlan; 484 + int ret;
469 485
470 - if (vc->link_down)  
471 - return; 486 + if (sender->link_down) {
  487 + return size;
  488 + }
472 489
473 #ifdef DEBUG_NET 490 #ifdef DEBUG_NET
474 - printf("vlan %d send:\n", vlan->id); 491 + printf("vlan %d send:\n", sender->vlan->id);
475 hex_dump(stdout, buf, size); 492 hex_dump(stdout, buf, size);
476 #endif 493 #endif
477 - if (vlan->delivering) {  
478 - qemu_enqueue_packet(vc, buf, size);  
479 - return; 494 +
  495 + if (sender->vlan->delivering) {
  496 + qemu_enqueue_packet(sender, buf, size, NULL);
  497 + return size;
480 } 498 }
481 499
482 - qemu_deliver_packet(vc, buf, size);  
483 - qemu_flush_queued_packets(vc); 500 + ret = qemu_deliver_packet(sender, buf, size);
  501 + if (ret == 0 && sent_cb != NULL) {
  502 + qemu_enqueue_packet(sender, buf, size, sent_cb);
  503 + return 0;
  504 + }
  505 +
  506 + qemu_flush_queued_packets(sender);
  507 +
  508 + return ret;
  509 +}
  510 +
  511 +void qemu_send_packet(VLANClientState *vc, const uint8_t *buf, int size)
  512 +{
  513 + qemu_send_packet_async(vc, buf, size, NULL);
484 } 514 }
485 515
486 static ssize_t vc_sendv_compat(VLANClientState *vc, const struct iovec *iov, 516 static ssize_t vc_sendv_compat(VLANClientState *vc, const struct iovec *iov,
@@ -498,9 +528,7 @@ static ssize_t vc_sendv_compat(VLANClientState *vc, const struct iovec *iov, @@ -498,9 +528,7 @@ static ssize_t vc_sendv_compat(VLANClientState *vc, const struct iovec *iov,
498 offset += len; 528 offset += len;
499 } 529 }
500 530
501 - vc->receive(vc, buffer, offset);  
502 -  
503 - return offset; 531 + return vc->receive(vc, buffer, offset);
504 } 532 }
505 533
506 static ssize_t calc_iov_length(const struct iovec *iov, int iovcnt) 534 static ssize_t calc_iov_length(const struct iovec *iov, int iovcnt)
@@ -548,7 +576,8 @@ static int qemu_deliver_packet_iov(VLANClientState *sender, @@ -548,7 +576,8 @@ static int qemu_deliver_packet_iov(VLANClientState *sender,
548 } 576 }
549 577
550 static ssize_t qemu_enqueue_packet_iov(VLANClientState *sender, 578 static ssize_t qemu_enqueue_packet_iov(VLANClientState *sender,
551 - const struct iovec *iov, int iovcnt) 579 + const struct iovec *iov, int iovcnt,
  580 + NetPacketSent *sent_cb)
552 { 581 {
553 VLANPacket *packet; 582 VLANPacket *packet;
554 size_t max_len = 0; 583 size_t max_len = 0;
@@ -559,6 +588,7 @@ static ssize_t qemu_enqueue_packet_iov(VLANClientState *sender, @@ -559,6 +588,7 @@ static ssize_t qemu_enqueue_packet_iov(VLANClientState *sender,
559 packet = qemu_malloc(sizeof(VLANPacket) + max_len); 588 packet = qemu_malloc(sizeof(VLANPacket) + max_len);
560 packet->next = sender->vlan->send_queue; 589 packet->next = sender->vlan->send_queue;
561 packet->sender = sender; 590 packet->sender = sender;
  591 + packet->sent_cb = sent_cb;
562 packet->size = 0; 592 packet->size = 0;
563 593
564 for (i = 0; i < iovcnt; i++) { 594 for (i = 0; i < iovcnt; i++) {
@@ -573,8 +603,9 @@ static ssize_t qemu_enqueue_packet_iov(VLANClientState *sender, @@ -573,8 +603,9 @@ static ssize_t qemu_enqueue_packet_iov(VLANClientState *sender,
573 return packet->size; 603 return packet->size;
574 } 604 }
575 605
576 -ssize_t qemu_sendv_packet(VLANClientState *sender, const struct iovec *iov,  
577 - int iovcnt) 606 +ssize_t qemu_sendv_packet_async(VLANClientState *sender,
  607 + const struct iovec *iov, int iovcnt,
  608 + NetPacketSent *sent_cb)
578 { 609 {
579 int ret; 610 int ret;
580 611
@@ -583,16 +614,26 @@ ssize_t qemu_sendv_packet(VLANClientState *sender, const struct iovec *iov, @@ -583,16 +614,26 @@ ssize_t qemu_sendv_packet(VLANClientState *sender, const struct iovec *iov,
583 } 614 }
584 615
585 if (sender->vlan->delivering) { 616 if (sender->vlan->delivering) {
586 - return qemu_enqueue_packet_iov(sender, iov, iovcnt); 617 + return qemu_enqueue_packet_iov(sender, iov, iovcnt, NULL);
587 } 618 }
588 619
589 ret = qemu_deliver_packet_iov(sender, iov, iovcnt); 620 ret = qemu_deliver_packet_iov(sender, iov, iovcnt);
  621 + if (ret == 0 && sent_cb != NULL) {
  622 + qemu_enqueue_packet_iov(sender, iov, iovcnt, sent_cb);
  623 + return 0;
  624 + }
590 625
591 qemu_flush_queued_packets(sender); 626 qemu_flush_queued_packets(sender);
592 627
593 return ret; 628 return ret;
594 } 629 }
595 630
  631 +ssize_t
  632 +qemu_sendv_packet(VLANClientState *vc, const struct iovec *iov, int iovcnt)
  633 +{
  634 + return qemu_sendv_packet_async(vc, iov, iovcnt, NULL);
  635 +}
  636 +
596 static void config_error(Monitor *mon, const char *fmt, ...) 637 static void config_error(Monitor *mon, const char *fmt, ...)
597 { 638 {
598 va_list ap; 639 va_list ap;
@@ -32,10 +32,13 @@ struct VLANClientState { @@ -32,10 +32,13 @@ struct VLANClientState {
32 32
33 typedef struct VLANPacket VLANPacket; 33 typedef struct VLANPacket VLANPacket;
34 34
  35 +typedef void (NetPacketSent) (VLANClientState *);
  36 +
35 struct VLANPacket { 37 struct VLANPacket {
36 struct VLANPacket *next; 38 struct VLANPacket *next;
37 VLANClientState *sender; 39 VLANClientState *sender;
38 int size; 40 int size;
  41 + NetPacketSent *sent_cb;
39 uint8_t data[0]; 42 uint8_t data[0];
40 }; 43 };
41 44
@@ -62,7 +65,12 @@ VLANClientState *qemu_find_vlan_client(VLANState *vlan, void *opaque); @@ -62,7 +65,12 @@ VLANClientState *qemu_find_vlan_client(VLANState *vlan, void *opaque);
62 int qemu_can_send_packet(VLANClientState *vc); 65 int qemu_can_send_packet(VLANClientState *vc);
63 ssize_t qemu_sendv_packet(VLANClientState *vc, const struct iovec *iov, 66 ssize_t qemu_sendv_packet(VLANClientState *vc, const struct iovec *iov,
64 int iovcnt); 67 int iovcnt);
  68 +ssize_t qemu_sendv_packet_async(VLANClientState *vc, const struct iovec *iov,
  69 + int iovcnt, NetPacketSent *sent_cb);
65 void qemu_send_packet(VLANClientState *vc, const uint8_t *buf, int size); 70 void qemu_send_packet(VLANClientState *vc, const uint8_t *buf, int size);
  71 +ssize_t qemu_send_packet_async(VLANClientState *vc, const uint8_t *buf,
  72 + int size, NetPacketSent *sent_cb);
  73 +void qemu_flush_queued_packets(VLANClientState *vc);
66 void qemu_format_nic_info_str(VLANClientState *vc, uint8_t macaddr[6]); 74 void qemu_format_nic_info_str(VLANClientState *vc, uint8_t macaddr[6]);
67 void qemu_check_nic_model(NICInfo *nd, const char *model); 75 void qemu_check_nic_model(NICInfo *nd, const char *model);
68 void qemu_check_nic_model_list(NICInfo *nd, const char * const *models, 76 void qemu_check_nic_model_list(NICInfo *nd, const char * const *models,