diff options
| -rw-r--r-- | activitypub.c | 21 | ||||
| -rw-r--r-- | data.c | 13 | ||||
| -rw-r--r-- | snac.h | 2 |
3 files changed, 24 insertions, 12 deletions
diff --git a/activitypub.c b/activitypub.c index 20c734b..8127e8d 100644 --- a/activitypub.c +++ b/activitypub.c | |||
| @@ -557,7 +557,7 @@ d_char *msg_note(snac *snac, char *content, char *rcpts, char *in_reply_to) | |||
| 557 | 557 | ||
| 558 | /** queues **/ | 558 | /** queues **/ |
| 559 | 559 | ||
| 560 | void process_message(snac *snac, char *msg, char *req) | 560 | int process_message(snac *snac, char *msg, char *req) |
| 561 | /* processes an ActivityPub message from the input queue */ | 561 | /* processes an ActivityPub message from the input queue */ |
| 562 | { | 562 | { |
| 563 | /* actor and type exist, were checked previously */ | 563 | /* actor and type exist, were checked previously */ |
| @@ -575,10 +575,8 @@ void process_message(snac *snac, char *msg, char *req) | |||
| 575 | 575 | ||
| 576 | /* bring the actor */ | 576 | /* bring the actor */ |
| 577 | if (!valid_status(actor_request(snac, actor, &actor_o))) { | 577 | if (!valid_status(actor_request(snac, actor, &actor_o))) { |
| 578 | /* error: re-enqueue to try later */ | ||
| 579 | enqueue_input(snac, msg, req); | ||
| 580 | snac_log(snac, xs_fmt("error requesting actor %s -- retry later", actor)); | 578 | snac_log(snac, xs_fmt("error requesting actor %s -- retry later", actor)); |
| 581 | return; | 579 | return 0; |
| 582 | } | 580 | } |
| 583 | 581 | ||
| 584 | /* check the signature */ | 582 | /* check the signature */ |
| @@ -690,6 +688,8 @@ void process_message(snac *snac, char *msg, char *req) | |||
| 690 | */ | 688 | */ |
| 691 | else | 689 | else |
| 692 | snac_debug(snac, 1, xs_fmt("process_message type '%s' ignored", type)); | 690 | snac_debug(snac, 1, xs_fmt("process_message type '%s' ignored", type)); |
| 691 | |||
| 692 | return 1; | ||
| 693 | } | 693 | } |
| 694 | 694 | ||
| 695 | 695 | ||
| @@ -742,8 +742,17 @@ void process_queue(snac *snac) | |||
| 742 | /* process the message */ | 742 | /* process the message */ |
| 743 | char *msg = xs_dict_get(q_item, "object"); | 743 | char *msg = xs_dict_get(q_item, "object"); |
| 744 | char *req = xs_dict_get(q_item, "req"); | 744 | char *req = xs_dict_get(q_item, "req"); |
| 745 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | ||
| 745 | 746 | ||
| 746 | process_message(snac, msg, req); | 747 | if (!process_message(snac, msg, req)) { |
| 748 | if (retries > queue_retry_max) | ||
| 749 | snac_log(snac, xs_fmt("process_queue input giving up")); | ||
| 750 | else { | ||
| 751 | /* reenqueue */ | ||
| 752 | enqueue_input(snac, msg, req, retries + 1); | ||
| 753 | snac_log(snac, xs_fmt("process_queue input requeue %d", retries + 1)); | ||
| 754 | } | ||
| 755 | } | ||
| 747 | } | 756 | } |
| 748 | } | 757 | } |
| 749 | } | 758 | } |
| @@ -907,7 +916,7 @@ int activitypub_post_handler(d_char *req, char *q_path, | |||
| 907 | } | 916 | } |
| 908 | 917 | ||
| 909 | if (valid_status(status)) { | 918 | if (valid_status(status)) { |
| 910 | enqueue_input(&snac, msg, req); | 919 | enqueue_input(&snac, msg, req, 0); |
| 911 | *ctype = "application/activity+json"; | 920 | *ctype = "application/activity+json"; |
| 912 | } | 921 | } |
| 913 | 922 | ||
| @@ -831,21 +831,24 @@ int static_get(snac *snac, char *id, d_char **data, int *size) | |||
| 831 | } | 831 | } |
| 832 | 832 | ||
| 833 | 833 | ||
| 834 | void enqueue_input(snac *snac, char *msg, char *req) | 834 | void enqueue_input(snac *snac, char *msg, char *req, int retries) |
| 835 | /* enqueues an input message */ | 835 | /* enqueues an input message */ |
| 836 | { | 836 | { |
| 837 | xs *ntid = tid(0); | 837 | int qrt = xs_number_get(xs_dict_get(srv_config, "queue_retry_minutes")); |
| 838 | xs *ntid = tid(retries * 60 * qrt); | ||
| 838 | xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); | 839 | xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); |
| 839 | xs *tfn = xs_fmt("%s.tmp", fn); | 840 | xs *tfn = xs_fmt("%s.tmp", fn); |
| 840 | FILE *f; | 841 | FILE *f; |
| 841 | 842 | ||
| 842 | if ((f = fopen(tfn, "w")) != NULL) { | 843 | if ((f = fopen(tfn, "w")) != NULL) { |
| 843 | xs *qmsg = xs_dict_new(); | 844 | xs *qmsg = xs_dict_new(); |
| 845 | xs *rn = xs_number_new(retries); | ||
| 844 | xs *j; | 846 | xs *j; |
| 845 | 847 | ||
| 846 | qmsg = xs_dict_append(qmsg, "type", "input"); | 848 | qmsg = xs_dict_append(qmsg, "type", "input"); |
| 847 | qmsg = xs_dict_append(qmsg, "object", msg); | 849 | qmsg = xs_dict_append(qmsg, "object", msg); |
| 848 | qmsg = xs_dict_append(qmsg, "req", req); | 850 | qmsg = xs_dict_append(qmsg, "req", req); |
| 851 | qmsg = xs_dict_append(qmsg, "retries", rn); | ||
| 849 | 852 | ||
| 850 | j = xs_json_dumps_pp(qmsg, 4); | 853 | j = xs_json_dumps_pp(qmsg, 4); |
| 851 | 854 | ||
| @@ -83,7 +83,7 @@ int actor_get(snac *snac, char *actor, d_char **data); | |||
| 83 | 83 | ||
| 84 | int static_get(snac *snac, char *id, d_char **data, int *size); | 84 | int static_get(snac *snac, char *id, d_char **data, int *size); |
| 85 | 85 | ||
| 86 | void enqueue_input(snac *snac, char *msg, char *req); | 86 | void enqueue_input(snac *snac, char *msg, char *req, int retries); |
| 87 | void enqueue_output(snac *snac, char *msg, char *actor, int retries); | 87 | void enqueue_output(snac *snac, char *msg, char *actor, int retries); |
| 88 | 88 | ||
| 89 | d_char *queue(snac *snac); | 89 | d_char *queue(snac *snac); |