Skip to content

Commit e72b091

Browse files
committed
Initial commit
0 parents  commit e72b091

10 files changed

+981
-0
lines changed

Diff for: .gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.mbp

Diff for: LICENSE

+661
Large diffs are not rendered by default.

Diff for: README.md

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# rss
2+
A [maubot](https://github.com/maubot/maubot) that posts updates in RSS feeds.

Diff for: base-config.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Feed update interval in minutes
2+
update_interval: 60
3+
# The time to sleep between send requests when broadcasting a new feed entry.
4+
# Set to 0 to disable sleep or -1 to run all requests asynchronously at once.
5+
spam_sleep: 2

Diff for: build.sh

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
zip -9r rss.mbp rss/ base-config.yaml maubot.ini

Diff for: maubot.ini

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[maubot]
2+
ID = xyz.maubot.rss
3+
Version = 0.1.0
4+
License = AGPL-3.0-or-later
5+
Modules = rss
6+
MainClass = RSSBot
7+
ExtraFiles = base-config.yaml

Diff for: rss/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .bot import RSSBot

Diff for: rss/bot.py

+154
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
# rss - A maubot plugin to subscribe to RSS/Atom feeds.
2+
# Copyright (C) 2018 Tulir Asokan
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU Affero General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
#
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU Affero General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU Affero General Public License
15+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
from typing import Type, List, Any
17+
from datetime import datetime
18+
from time import mktime
19+
import asyncio
20+
21+
import aiohttp
22+
import feedparser
23+
24+
from maubot import Plugin, MessageEvent
25+
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
26+
from mautrix.types import EventType, MessageType, RoomID
27+
28+
from .db import Database, Feed, Entry
29+
30+
31+
class Config(BaseProxyConfig):
32+
def do_update(self, helper: ConfigUpdateHelper) -> None:
33+
helper.copy("update_interval")
34+
helper.copy("spam_sleep")
35+
36+
37+
class RSSBot(Plugin):
38+
db: Database
39+
poll_task: asyncio.Future
40+
http: aiohttp.ClientSession
41+
42+
@classmethod
43+
def get_config_class(cls) -> Type[BaseProxyConfig]:
44+
return Config
45+
46+
async def start(self) -> None:
47+
self.config.load_and_update()
48+
self.db = Database(self.request_db_engine())
49+
self.client.add_event_handler(self.event_handler, EventType.ROOM_MESSAGE)
50+
self.http = self.client.api.session
51+
52+
self.poll_task = asyncio.ensure_future(self.poll_feeds(), loop=self.loop)
53+
54+
async def stop(self) -> None:
55+
self.client.remove_event_handler(self.event_handler, EventType.ROOM_MESSAGE)
56+
self.poll_task.cancel()
57+
58+
async def poll_feeds(self) -> None:
59+
try:
60+
await self._poll_feeds()
61+
except asyncio.CancelledError:
62+
self.log.debug("Polling stopped")
63+
pass
64+
except Exception:
65+
self.log.exception("Failed to poll feeds")
66+
67+
async def _broadcast(self, feed: Feed, entry: Entry, subscriptions: List[RoomID]) -> None:
68+
text = f"New post in {feed.title}: {entry.title} ({entry.link})"
69+
html = f"New post in {feed.title}: <a href='{entry.link}'>{entry.title}</a>"
70+
spam_sleep = self.config["spam_sleep"]
71+
tasks = [self.client.send_notice(room_id, text=text, html=html) for room_id in
72+
subscriptions]
73+
if spam_sleep >= 0:
74+
for task in tasks:
75+
await task
76+
await asyncio.sleep(spam_sleep, loop=self.loop)
77+
else:
78+
await asyncio.gather(*tasks)
79+
80+
async def _poll_once(self) -> None:
81+
subs = self.db.get_feeds()
82+
if not subs:
83+
return
84+
responses = await asyncio.gather(*[self.http.get(feed.url) for feed in subs], loop=self.loop)
85+
texts = await asyncio.gather(*[resp.text() for resp in responses], loop=self.loop)
86+
for feed, data in zip(subs, texts):
87+
parsed_data = feedparser.parse(data)
88+
entries = parsed_data.entries
89+
new_entries = {entry.id: entry for entry in self.find_entries(feed.id, entries)}
90+
for old_entry in self.db.get_entries(feed.id):
91+
new_entries.pop(old_entry.id, None)
92+
self.db.add_entries(new_entries.values())
93+
for entry in new_entries.values():
94+
await self._broadcast(feed, entry, feed.subscriptions)
95+
96+
async def _poll_feeds(self) -> None:
97+
self.log.debug("Polling started")
98+
while True:
99+
await self._poll_once()
100+
await asyncio.sleep(self.config["update_interval"] * 60, loop=self.loop)
101+
102+
async def read_feed(self, url: str):
103+
resp = await self.http.get(url)
104+
content = await resp.text()
105+
return feedparser.parse(content)
106+
107+
@staticmethod
108+
def find_entries(feed_id: int, entries: List[Any]) -> List[Entry]:
109+
return [Entry(
110+
feed_id=feed_id,
111+
id=entry.id,
112+
date=datetime.fromtimestamp(mktime(entry.published_parsed)),
113+
title=entry.title,
114+
summary=entry.description,
115+
link=entry.link,
116+
) for entry in entries]
117+
118+
async def event_handler(self, evt: MessageEvent) -> None:
119+
if evt.content.msgtype != MessageType.TEXT or not evt.content.body.startswith("!rss"):
120+
return
121+
122+
args = evt.content.body[len("!rss "):].split(" ")
123+
cmd, args = args[0].lower(), args[1:]
124+
if cmd == "sub" or cmd == "subscribe":
125+
if len(args) == 0:
126+
await evt.reply(f"**Usage:** !rss {cmd} <feed URL>")
127+
return
128+
url = " ".join(args)
129+
feed = self.db.get_feed_by_url(url)
130+
if not feed:
131+
metadata = await self.read_feed(url)
132+
feed = self.db.create_feed(url, metadata["channel"]["title"],
133+
metadata["channel"]["description"],
134+
metadata["channel"]["link"])
135+
self.db.add_entries(self.find_entries(feed.id, metadata.entries))
136+
self.db.subscribe(feed.id, evt.room_id, evt.sender)
137+
await evt.reply(f"Subscribed to feed ID {feed.id}: [{feed.title}]({feed.url})")
138+
elif cmd == "unsub" or cmd == "unsubscribe":
139+
if len(args) == 0:
140+
await evt.reply(f"**Usage:** !rss {cmd} <feed ID>")
141+
return
142+
feed = self.db.get_feed_by_id_or_url(" ".join(args))
143+
if not feed:
144+
await evt.reply("Feed not found")
145+
return
146+
self.db.unsubscribe(feed.id, evt.room_id)
147+
await evt.reply(f"Unsubscribed from feed ID {feed.id}: [{feed.title}]({feed.url})")
148+
elif cmd == "subs" or cmd == "subscriptions":
149+
subscriptions = self.db.get_feeds_by_room(evt.room_id)
150+
await evt.reply("**Subscriptions in this room:**\n\n"
151+
+ "\n".join(f"* {feed.id} - [{feed.title}]({feed.url})"
152+
for feed in subscriptions))
153+
else:
154+
await evt.reply("**Usage:** !rss <sub/unsub/subs> [params...]")

Diff for: rss/db.py

+123
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# rss - A maubot plugin to subscribe to RSS/Atom feeds.
2+
# Copyright (C) 2018 Tulir Asokan
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU Affero General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
#
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU Affero General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU Affero General Public License
15+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
from typing import Iterable, NamedTuple, List, Optional, Dict
17+
from datetime import datetime
18+
19+
from sqlalchemy import (Column, String, Integer, DateTime, Text, ForeignKey,
20+
Table, MetaData,
21+
select, and_, or_)
22+
from sqlalchemy.engine.base import Engine
23+
24+
from mautrix.types import UserID, RoomID
25+
26+
Feed = NamedTuple("Feed", id=int, url=str, title=str, subtitle=str, link=str,
27+
subscriptions=List[RoomID])
28+
Entry = NamedTuple("Entry", feed_id=int, id=str, date=datetime, title=str, summary=str, link=str)
29+
30+
31+
class Database:
32+
db: Engine
33+
feed: Table
34+
subscription: Table
35+
entry: Table
36+
version: Table
37+
38+
def __init__(self, db: Engine) -> None:
39+
self.db = db
40+
metadata = MetaData()
41+
self.feed = Table("feed", metadata,
42+
Column("id", Integer, primary_key=True, autoincrement=True),
43+
Column("url", Text, nullable=False, unique=True),
44+
Column("title", Text, nullable=False),
45+
Column("subtitle", Text, nullable=False),
46+
Column("link", Text, nullable=False))
47+
self.subscription = Table("subscription", metadata,
48+
Column("feed_id", Integer, ForeignKey("feed.id"),
49+
primary_key=True),
50+
Column("room_id", String(255), primary_key=True),
51+
Column("user_id", String(255), nullable=False))
52+
self.entry = Table("entry", metadata,
53+
Column("feed_id", Integer, ForeignKey("feed.id"), primary_key=True),
54+
Column("id", String(255), primary_key=True),
55+
Column("date", DateTime, nullable=False),
56+
Column("title", Text, nullable=False),
57+
Column("summary", Text, nullable=False),
58+
Column("link", Text, nullable=False))
59+
self.version = Table("version", metadata,
60+
Column("version", Integer, primary_key=True))
61+
metadata.create_all(db)
62+
63+
def get_feeds(self) -> Iterable[Feed]:
64+
rows = self.db.execute(select([self.feed, self.subscription.c.room_id])
65+
.where(self.subscription.c.feed_id == self.feed.c.id))
66+
map: Dict[int, Feed] = {}
67+
for row in rows:
68+
feed_id, url, title, subtitle, link, room_id = row
69+
map.setdefault(feed_id, Feed(feed_id, url, title, subtitle, link, subscriptions=[]))
70+
map[feed_id].subscriptions.append(room_id)
71+
return map.values()
72+
73+
def get_feeds_by_room(self, room_id: RoomID) -> Iterable[Feed]:
74+
return (Feed(*row, subscriptions=[]) for row in
75+
self.db.execute(select([self.feed])
76+
.where(and_(self.subscription.c.room_id == room_id,
77+
self.subscription.c.feed_id == self.feed.c.id))))
78+
79+
def get_rooms_by_feed(self, feed_id: int) -> Iterable[RoomID]:
80+
return (row[0] for row in
81+
self.db.execute(select([self.subscription.c.room_id])
82+
.where(self.subscription.c.feed_id == feed_id)))
83+
84+
def get_entries(self, feed_id: int) -> Iterable[Entry]:
85+
return (Entry(*row) for row in
86+
self.db.execute(select([self.entry]).where(self.entry.c.feed_id == feed_id)))
87+
88+
def add_entries(self, entries: Iterable[Entry]) -> None:
89+
if not entries:
90+
return
91+
self.db.execute(self.entry.insert(), [entry._asdict() for entry in entries])
92+
93+
def get_feed_by_url(self, url: str) -> Optional[Feed]:
94+
rows = self.db.execute(select([self.feed]).where(self.feed.c.url == url))
95+
try:
96+
row = next(rows)
97+
return Feed(*row, subscriptions=[])
98+
except (StopIteration, IndexError):
99+
return None
100+
101+
def get_feed_by_id_or_url(self, identifier: str) -> Optional[Feed]:
102+
rows = self.db.execute(select([self.feed]).where(
103+
or_(self.feed.c.url == identifier, self.feed.c.id == identifier)))
104+
try:
105+
row = next(rows)
106+
return Feed(*row, subscriptions=[])
107+
except (StopIteration, IndexError):
108+
return None
109+
110+
def create_feed(self, url: str, title: str, subtitle: str, link: str) -> Feed:
111+
res = self.db.execute(self.feed.insert().values(url=url, title=title, subtitle=subtitle,
112+
link=link))
113+
return Feed(id=res.inserted_primary_key[0], url=url, title=title, subtitle=subtitle,
114+
link=link, subscriptions=[])
115+
116+
def subscribe(self, feed_id: int, room_id: RoomID, user_id: UserID) -> None:
117+
self.db.execute(self.subscription.insert().values(feed_id=feed_id, room_id=room_id,
118+
user_id=user_id))
119+
120+
def unsubscribe(self, feed_id: int, room_id: RoomID) -> None:
121+
tbl = self.subscription
122+
self.db.execute(tbl.delete().where(and_(tbl.c.feed_id == feed_id,
123+
tbl.c.room_id == room_id)))

Diff for: rss/migrations.py

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# rss - A maubot plugin to subscribe to RSS/Atom feeds.
2+
# Copyright (C) 2018 Tulir Asokan
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU Affero General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
#
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU Affero General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU Affero General Public License
15+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
from sqlalchemy import select
17+
from sqlalchemy.engine.base import Engine
18+
from alembic.migration import MigrationContext
19+
from alembic.operations import Operations
20+
21+
22+
def run(engine: Engine):
23+
conn = engine.connect()
24+
ctx = MigrationContext.configure(conn)
25+
op = Operations(ctx)

0 commit comments

Comments
 (0)