diff options
| author | 2022-10-11 08:16:58 +0200 | |
|---|---|---|
| committer | 2022-10-11 08:16:58 +0200 | |
| commit | e8fbc94089b70d9fdd7e3b25ad809dfea454770a (patch) | |
| tree | 43522058393a703b4354a42747fd093f6e3c344f /data.c | |
| parent | Updated TODO. (diff) | |
| download | snac2-e8fbc94089b70d9fdd7e3b25ad809dfea454770a.tar.gz snac2-e8fbc94089b70d9fdd7e3b25ad809dfea454770a.tar.xz snac2-e8fbc94089b70d9fdd7e3b25ad809dfea454770a.zip | |
Unify enqueueing code.
Diffstat (limited to 'data.c')
| -rw-r--r-- | data.c | 74 |
1 files changed, 37 insertions, 37 deletions
| @@ -892,34 +892,45 @@ d_char *history_list(snac *snac) | |||
| 892 | } | 892 | } |
| 893 | 893 | ||
| 894 | 894 | ||
| 895 | void enqueue_input(snac *snac, char *msg, char *req, int retries) | 895 | static int _enqueue_put(char *fn, char *msg) |
| 896 | /* enqueues an input message */ | 896 | /* writes safely to the queue */ |
| 897 | { | 897 | { |
| 898 | int qrt = xs_number_get(xs_dict_get(srv_config, "queue_retry_minutes")); | 898 | int ret = 1; |
| 899 | xs *ntid = tid(retries * 60 * qrt); | 899 | xs *tfn = xs_fmt("%s.tmp", fn); |
| 900 | xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); | ||
| 901 | xs *tfn = xs_fmt("%s.tmp", fn); | ||
| 902 | FILE *f; | 900 | FILE *f; |
| 903 | 901 | ||
| 904 | if ((f = fopen(tfn, "w")) != NULL) { | 902 | if ((f = fopen(tfn, "w")) != NULL) { |
| 905 | xs *qmsg = xs_dict_new(); | 903 | xs *j = xs_json_dumps_pp(msg, 4); |
| 906 | xs *rn = xs_number_new(retries); | ||
| 907 | xs *j; | ||
| 908 | |||
| 909 | qmsg = xs_dict_append(qmsg, "type", "input"); | ||
| 910 | qmsg = xs_dict_append(qmsg, "object", msg); | ||
| 911 | qmsg = xs_dict_append(qmsg, "req", req); | ||
| 912 | qmsg = xs_dict_append(qmsg, "retries", rn); | ||
| 913 | |||
| 914 | j = xs_json_dumps_pp(qmsg, 4); | ||
| 915 | 904 | ||
| 916 | fwrite(j, strlen(j), 1, f); | 905 | fwrite(j, strlen(j), 1, f); |
| 917 | fclose(f); | 906 | fclose(f); |
| 918 | 907 | ||
| 919 | rename(tfn, fn); | 908 | rename(tfn, fn); |
| 920 | |||
| 921 | snac_debug(snac, 1, xs_fmt("enqueue_input %s", fn)); | ||
| 922 | } | 909 | } |
| 910 | else | ||
| 911 | ret = 0; | ||
| 912 | |||
| 913 | return ret; | ||
| 914 | } | ||
| 915 | |||
| 916 | |||
| 917 | void enqueue_input(snac *snac, char *msg, char *req, int retries) | ||
| 918 | /* enqueues an input message */ | ||
| 919 | { | ||
| 920 | int qrt = xs_number_get(xs_dict_get(srv_config, "queue_retry_minutes")); | ||
| 921 | xs *ntid = tid(retries * 60 * qrt); | ||
| 922 | xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); | ||
| 923 | xs *qmsg = xs_dict_new(); | ||
| 924 | xs *rn = xs_number_new(retries); | ||
| 925 | |||
| 926 | qmsg = xs_dict_append(qmsg, "type", "input"); | ||
| 927 | qmsg = xs_dict_append(qmsg, "object", msg); | ||
| 928 | qmsg = xs_dict_append(qmsg, "req", req); | ||
| 929 | qmsg = xs_dict_append(qmsg, "retries", rn); | ||
| 930 | |||
| 931 | _enqueue_put(fn, qmsg); | ||
| 932 | |||
| 933 | snac_debug(snac, 1, xs_fmt("enqueue_input %s", fn)); | ||
| 923 | } | 934 | } |
| 924 | 935 | ||
| 925 | 936 | ||
| @@ -934,28 +945,17 @@ void enqueue_output(snac *snac, char *msg, char *actor, int retries) | |||
| 934 | int qrt = xs_number_get(xs_dict_get(srv_config, "queue_retry_minutes")); | 945 | int qrt = xs_number_get(xs_dict_get(srv_config, "queue_retry_minutes")); |
| 935 | xs *ntid = tid(retries * 60 * qrt); | 946 | xs *ntid = tid(retries * 60 * qrt); |
| 936 | xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); | 947 | xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); |
| 937 | xs *tfn = xs_fmt("%s.tmp", fn); | 948 | xs *qmsg = xs_dict_new(); |
| 938 | FILE *f; | 949 | xs *rn = xs_number_new(retries); |
| 939 | 950 | ||
| 940 | if ((f = fopen(tfn, "w")) != NULL) { | 951 | qmsg = xs_dict_append(qmsg, "type", "output"); |
| 941 | xs *qmsg = xs_dict_new(); | 952 | qmsg = xs_dict_append(qmsg, "actor", actor); |
| 942 | xs *rn = xs_number_new(retries); | 953 | qmsg = xs_dict_append(qmsg, "object", msg); |
| 943 | xs *j; | 954 | qmsg = xs_dict_append(qmsg, "retries", rn); |
| 944 | |||
| 945 | qmsg = xs_dict_append(qmsg, "type", "output"); | ||
| 946 | qmsg = xs_dict_append(qmsg, "actor", actor); | ||
| 947 | qmsg = xs_dict_append(qmsg, "object", msg); | ||
| 948 | qmsg = xs_dict_append(qmsg, "retries", rn); | ||
| 949 | |||
| 950 | j = xs_json_dumps_pp(qmsg, 4); | ||
| 951 | |||
| 952 | fwrite(j, strlen(j), 1, f); | ||
| 953 | fclose(f); | ||
| 954 | 955 | ||
| 955 | rename(tfn, fn); | 956 | _enqueue_put(fn, qmsg); |
| 956 | 957 | ||
| 957 | snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", actor, fn, retries)); | 958 | snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", actor, fn, retries)); |
| 958 | } | ||
| 959 | } | 959 | } |
| 960 | 960 | ||
| 961 | 961 | ||