diff options
| -rw-r--r-- | activitypub.c | 46 | ||||
| -rw-r--r-- | data.c | 72 | ||||
| -rw-r--r-- | snac.h | 1 |
3 files changed, 82 insertions, 37 deletions
diff --git a/activitypub.c b/activitypub.c index 5d54833..b654beb 100644 --- a/activitypub.c +++ b/activitypub.c | |||
| @@ -80,9 +80,10 @@ int send_to_inbox(snac *snac, char *inbox, char *msg, d_char **payload, int *p_s | |||
| 80 | { | 80 | { |
| 81 | int status; | 81 | int status; |
| 82 | d_char *response; | 82 | d_char *response; |
| 83 | xs *j_msg = xs_json_dumps_pp(msg, 4); | ||
| 83 | 84 | ||
| 84 | response = http_signed_request(snac, "POST", inbox, | 85 | response = http_signed_request(snac, "POST", inbox, |
| 85 | NULL, msg, strlen(msg), &status, payload, p_size); | 86 | NULL, j_msg, strlen(j_msg), &status, payload, p_size); |
| 86 | 87 | ||
| 87 | free(response); | 88 | free(response); |
| 88 | 89 | ||
| @@ -108,5 +109,48 @@ int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_s | |||
| 108 | status = 400; | 109 | status = 400; |
| 109 | } | 110 | } |
| 110 | 111 | ||
| 112 | snac_log(snac, xs_fmt("send_to_actor %s %d", actor, status)); | ||
| 113 | |||
| 111 | return status; | 114 | return status; |
| 112 | } | 115 | } |
| 116 | |||
| 117 | |||
| 118 | void process_queue(snac *snac) | ||
| 119 | /* processes the queue */ | ||
| 120 | { | ||
| 121 | xs *list; | ||
| 122 | char *p, *fn; | ||
| 123 | int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); | ||
| 124 | |||
| 125 | list = queue(snac); | ||
| 126 | |||
| 127 | p = list; | ||
| 128 | while (xs_list_iter(&p, &fn)) { | ||
| 129 | xs *q_item = dequeue(snac, fn); | ||
| 130 | char *type; | ||
| 131 | |||
| 132 | if ((type = xs_dict_get(q_item, "type")) == NULL) | ||
| 133 | type = "output"; | ||
| 134 | |||
| 135 | if (strcmp(type, "output") == 0) { | ||
| 136 | int status; | ||
| 137 | char *actor = xs_dict_get(q_item, "actor"); | ||
| 138 | char *msg = xs_dict_get(q_item, "object"); | ||
| 139 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | ||
| 140 | |||
| 141 | /* deliver */ | ||
| 142 | status = send_to_actor(snac, actor, msg, NULL, 0); | ||
| 143 | |||
| 144 | if (!valid_status(status)) { | ||
| 145 | /* error sending; reenqueue? */ | ||
| 146 | if (retries > queue_retry_max) | ||
| 147 | snac_log(snac, xs_fmt("process_queue giving up %s %d", actor, status)); | ||
| 148 | else { | ||
| 149 | /* reenqueue */ | ||
| 150 | enqueue_output(snac, actor, msg, retries + 1); | ||
| 151 | snac_log(snac, xs_fmt("process_queue requeue %s %d", actor, retries + 1)); | ||
| 152 | } | ||
| 153 | } | ||
| 154 | } | ||
| 155 | } | ||
| 156 | } | ||
| @@ -631,42 +631,6 @@ int is_muted(snac *snac, char *actor) | |||
| 631 | } | 631 | } |
| 632 | 632 | ||
| 633 | 633 | ||
| 634 | void enqueue_output(snac *snac, char *actor, char *msg, int retries) | ||
| 635 | /* enqueues an output message for an actor */ | ||
| 636 | { | ||
| 637 | if (strcmp(actor, snac->actor) == 0) { | ||
| 638 | snac_debug(snac, 1, xs_str_new("enqueue refused to myself")); | ||
| 639 | return; | ||
| 640 | } | ||
| 641 | |||
| 642 | int qrt = xs_number_get(xs_dict_get(srv_config, "query_retry_minutes")); | ||
| 643 | xs *ntid = tid(retries * 60 * qrt); | ||
| 644 | xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); | ||
| 645 | xs *tfn = xs_str_cat(fn, ".tmp"); | ||
| 646 | FILE *f; | ||
| 647 | |||
| 648 | if ((f = fopen(tfn, "w")) != NULL) { | ||
| 649 | xs *qmsg = xs_dict_new(); | ||
| 650 | xs *rn = xs_number_new(retries); | ||
| 651 | xs *j; | ||
| 652 | |||
| 653 | qmsg = xs_dict_append(qmsg, "type", "output"); | ||
| 654 | qmsg = xs_dict_append(qmsg, "actor", actor); | ||
| 655 | qmsg = xs_dict_append(qmsg, "object", msg); | ||
| 656 | qmsg = xs_dict_append(qmsg, "retries", rn); | ||
| 657 | |||
| 658 | j = xs_json_dumps_pp(qmsg, 4); | ||
| 659 | |||
| 660 | fwrite(j, strlen(j), 1, f); | ||
| 661 | fclose(f); | ||
| 662 | |||
| 663 | rename(tfn, fn); | ||
| 664 | |||
| 665 | snac_debug(snac, 2, xs_fmt("enqueue %s %s %d", actor, fn, retries)); | ||
| 666 | } | ||
| 667 | } | ||
| 668 | |||
| 669 | |||
| 670 | d_char *_actor_fn(snac *snac, char *actor) | 634 | d_char *_actor_fn(snac *snac, char *actor) |
| 671 | /* returns the file name for an actor */ | 635 | /* returns the file name for an actor */ |
| 672 | { | 636 | { |
| @@ -745,6 +709,42 @@ int actor_get(snac *snac, char *actor, d_char **data) | |||
| 745 | } | 709 | } |
| 746 | 710 | ||
| 747 | 711 | ||
| 712 | void enqueue_output(snac *snac, char *actor, char *msg, int retries) | ||
| 713 | /* enqueues an output message for an actor */ | ||
| 714 | { | ||
| 715 | if (strcmp(actor, snac->actor) == 0) { | ||
| 716 | snac_debug(snac, 1, xs_str_new("enqueue refused to myself")); | ||
| 717 | return; | ||
| 718 | } | ||
| 719 | |||
| 720 | int qrt = xs_number_get(xs_dict_get(srv_config, "query_retry_minutes")); | ||
| 721 | xs *ntid = tid(retries * 60 * qrt); | ||
| 722 | xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); | ||
| 723 | xs *tfn = xs_str_cat(fn, ".tmp"); | ||
| 724 | FILE *f; | ||
| 725 | |||
| 726 | if ((f = fopen(tfn, "w")) != NULL) { | ||
| 727 | xs *qmsg = xs_dict_new(); | ||
| 728 | xs *rn = xs_number_new(retries); | ||
| 729 | xs *j; | ||
| 730 | |||
| 731 | qmsg = xs_dict_append(qmsg, "type", "output"); | ||
| 732 | qmsg = xs_dict_append(qmsg, "actor", actor); | ||
| 733 | qmsg = xs_dict_append(qmsg, "object", msg); | ||
| 734 | qmsg = xs_dict_append(qmsg, "retries", rn); | ||
| 735 | |||
| 736 | j = xs_json_dumps_pp(qmsg, 4); | ||
| 737 | |||
| 738 | fwrite(j, strlen(j), 1, f); | ||
| 739 | fclose(f); | ||
| 740 | |||
| 741 | rename(tfn, fn); | ||
| 742 | |||
| 743 | snac_debug(snac, 2, xs_fmt("enqueue %s %s %d", actor, fn, retries)); | ||
| 744 | } | ||
| 745 | } | ||
| 746 | |||
| 747 | |||
| 748 | d_char *queue(snac *snac) | 748 | d_char *queue(snac *snac) |
| 749 | /* returns a list with filenames that can be dequeued */ | 749 | /* returns a list with filenames that can be dequeued */ |
| 750 | { | 750 | { |
| @@ -84,3 +84,4 @@ int activitypub_request(snac *snac, char *url, d_char **data); | |||
| 84 | int actor_request(snac *snac, char *actor, d_char **data); | 84 | int actor_request(snac *snac, char *actor, d_char **data); |
| 85 | int send_to_inbox(snac *snac, char *inbox, char *msg, d_char **payload, int *p_size); | 85 | int send_to_inbox(snac *snac, char *inbox, char *msg, d_char **payload, int *p_size); |
| 86 | int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_size); | 86 | int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_size); |
| 87 | void process_queue(snac *snac); | ||