Finally finished
This commit is contained in:
parent
309b836931
commit
5b213eb72f
10 changed files with 112 additions and 60 deletions
|
@ -1,9 +1,8 @@
|
|||
package subprocess
|
||||
|
||||
/* This file manages incoming messages*/
|
||||
/* This file manages incoming messages, marked with "method":"receive" in the JSON */
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"signal-cli-http/auth"
|
||||
"sort"
|
||||
"sync"
|
||||
|
@ -22,25 +21,29 @@ type IncomingMessage struct {
|
|||
|
||||
func newIncomingMessage() *IncomingMessage {return &IncomingMessage{}}
|
||||
|
||||
/* Stores incoming messages in a room up to at least 15 minutes old 10,000
|
||||
This is intentionally an array of pointers so cache-clearing is faster */
|
||||
/* Stores unlimited incoming messages up to 5 minutes old.
|
||||
This is intentionally an array of pointers so cache-clearing and appending
|
||||
don't require a copy of unmarshaledJSONMap. */
|
||||
var incomingMessageCache []*IncomingMessage;
|
||||
/* Locks incomingMessageCache */
|
||||
var incomingMessageCacheLock sync.RWMutex;
|
||||
|
||||
/* Here exclusively for testing purposes */
|
||||
func GetIMC() []*IncomingMessage {return incomingMessageCache}
|
||||
|
||||
/* Handler for incoming JSON objects which have "method":"receive" */
|
||||
func handleIncoming(body string, unmarshaledJSONMap map[string]any) (ok bool) {
|
||||
// Check that the message's method is "receive"
|
||||
if val, ok := unmarshaledJSONMap["method"]; !ok || val != "receive" {return false}
|
||||
fmt.Println(body)
|
||||
|
||||
// Create new message structure
|
||||
var newMessage *IncomingMessage = newIncomingMessage();
|
||||
// Using time.Now to ensure that pre/post-dated messages don't have issue
|
||||
// Using time.Now() to ensure that pre/post-dated messages don't have issue
|
||||
newMessage.receivedTime = time.Now();
|
||||
newMessage.body = body;
|
||||
newMessage.unmarshaledJSONMap = unmarshaledJSONMap;
|
||||
|
||||
// Obtain read-write lock
|
||||
// Add message into cache
|
||||
incomingMessageCacheLock.Lock();
|
||||
incomingMessageCache = append(incomingMessageCache, newMessage);
|
||||
incomingMessageCacheLock.Unlock();
|
||||
|
@ -48,26 +51,23 @@ func handleIncoming(body string, unmarshaledJSONMap map[string]any) (ok bool) {
|
|||
}
|
||||
|
||||
/* Handles clearing space in incomingMessageCache */
|
||||
func main() {go cacheClear()}
|
||||
|
||||
func StartCacheClear() {go cacheClear()}
|
||||
/* Runs in an infinite loop to try and clear the cache when needed */
|
||||
func cacheClear() {
|
||||
for {
|
||||
// More than reasonable delay
|
||||
time.Sleep(time.Millisecond);
|
||||
// Only clear when it's 1000 items over
|
||||
time.Sleep(time.Millisecond * 25);
|
||||
// Only attempt to clear when it's 1000 items or more
|
||||
if len(incomingMessageCache) < 1000 {continue}
|
||||
|
||||
incomingMessageCacheLock.Lock();
|
||||
|
||||
// Don't clear anything after this time
|
||||
fifteenMinutesAgo := time.Now().Add(-15 * time.Minute);
|
||||
fiveMinutesAgo := time.Now().Add(time.Minute*(-5));
|
||||
|
||||
// Find index in incomingMessageCache that is closest above 15 minutes ago
|
||||
// Find first index in incomingMessageCache that is at most 15 minutes old
|
||||
i := sort.Search(len(incomingMessageCache), func(i int) bool {
|
||||
return incomingMessageCache[i].receivedTime.After(fifteenMinutesAgo)
|
||||
return incomingMessageCache[i].receivedTime.After(fiveMinutesAgo)
|
||||
})
|
||||
|
||||
incomingMessageCache = incomingMessageCache[i:]
|
||||
|
||||
incomingMessageCacheLock.Unlock();
|
||||
|
@ -75,28 +75,29 @@ func cacheClear() {
|
|||
}
|
||||
|
||||
/* Returns a list of encoded JSON strings from incomingMessageCache that match
|
||||
the filter from */
|
||||
the filter from
|
||||
@return always valid JSON array object. Can be empty. */
|
||||
func GetIncoming(filter map[string]any) string {
|
||||
var list []string;
|
||||
// Create copy of incomingMessageCache for efficency
|
||||
// Create copy of incomingMessageCache as the following loop can be slow
|
||||
incomingMessageCacheLock.RLock();
|
||||
incomingMessageCacheCopy := incomingMessageCache;
|
||||
incomingMessageCacheLock.RUnlock();
|
||||
|
||||
// Create list of messages that match the filter
|
||||
var list []string;
|
||||
for _, message := range incomingMessageCacheCopy {
|
||||
if !auth.Match(message.unmarshaledJSONMap, filter) {continue}
|
||||
fmt.Println(message.body)
|
||||
list = append(list, message.body)
|
||||
}
|
||||
|
||||
// Constructs the JSON string without considering the JSON object
|
||||
var encoded string = "["
|
||||
for index, object := range list {
|
||||
encoded += object
|
||||
if index == len(list) - 1 {continue}
|
||||
encoded += ","
|
||||
}
|
||||
encoded += "]"
|
||||
encoded += "]\n"
|
||||
|
||||
return encoded;
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue