I wrote a Python script to convert your Twitter archive into a training dataset for fine-tuning a language model on your personality. It also extracts all your tweets, threads, and media into Markdown files so you can read them or easily make a website. (Link in next tweet)
@deepfates what's the fastest way for me to build a clone of myself like yours? Is there a repo where I can just point it at my text and get a thing I can self host (so I can do this experiment of my older self talks to my young self)
— Defender (on vacation until Jan 3) (@DefenderOfBasic) November 17, 2024
Just download this file and run it on your Twitter archive with Python. It has no dependencies, so you don’t even need to worry about Python environment stuff.

import argparse
import json
import logging
import os
import re
import shutil
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple
# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class MediaFile:
id: str
content_type: str
path: str
metadata: Dict[str, Any]
@dataclass
class Content:
id: str
text: str
metadata: Dict[str, Any]
timestamp: str
parent_id: Optional[str]
media_files: List[Dict[str, Any]]
content_source: str
@dataclass
class Thread:
id: str
contents: List[Content]
@dataclass
class Message:
role: Literal["assistant", "user"]
content: str
# Data extraction functions
def clean_json_string(json_string: str) -> str:
return re.sub(r'^window\.[^=]+=\s*', '', json_string.strip()).rstrip(';')
def process_file(file_path: str) -> List[Dict[str, Any]]:
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = clean_json_string(f.read())
results = json.loads(data)
return results
except Exception as e:
logger.warning(f"Error processing file {file_path}: {e}")
return []
def extract_manifest(file_path: str) -> Dict[str, Any]:
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = clean_json_string(file.read())
return json.loads(content)
except json.JSONDecodeError:
match = re.search(r'window\.__THAR_CONFIG\s*=\s*({.*})', content, re.DOTALL)
if match:
return json.loads(match.group(1))
logger.error(f"Could not parse __THAR_CONFIG in manifest file: {file_path}")
raise
except Exception as e:
logger.error(f"Error extracting manifest from {file_path}: {e}")
raise
def get_media_files(tweet_id: str, media_folder: str) -> List[str]:
try:
all_files = os.listdir(media_folder)
media_files = [
f for f in all_files
if f.startswith(f"{tweet_id}-") and os.path.getsize(os.path.join(media_folder, f)) > 0
]
return media_files
except Exception as e:
logger.error(f"Error getting media files for tweet_id {tweet_id}: {e}")
return []
def get_media_type(filename: str) -> str:
ext = os.path.splitext(filename)[1].lower()
if ext in ('.mp4', '.mov'):
return 'video'
elif ext in ('.jpg', '.jpeg', '.png', '.gif'):
return 'photo'
return 'unknown'
def extract_content(item: Dict[str, Any], content_source: str, media_folder: str) -> List[Content]:
content_id = item.get('id') or item.get('tweetId')
text = item.get('text') or item.get('fullText') or item.get('full_text')
media_files = get_media_files(content_id, media_folder)
media_file_objects = [{
'id': f"{content_id}_{os.path.splitext(media_file)[0]}",
'content_type': get_media_type(media_file),
'path': os.path.join(media_folder, media_file),
'metadata': {
'parent_tweet': item,
'media_info': item.get('extended_entities', {}).get('media', [])
}
} for media_file in media_files]
return [Content(
id=content_id,
text=text,
metadata=item,
timestamp=item.get('created_at', ''),
parent_id=item.get('in_reply_to_status_id', None),
media_files=media_file_objects,
content_source=content_source
)]
def process_file_wrapper(args: Tuple[str, Dict[str, Any], str, str]) -> List[Content]:
archive_path, file_info, extractor_name, media_folder = args
file_path = os.path.join(archive_path, file_info['fileName'])
file_data = process_file(file_path)
extractor = globals()[extractor_name] # Get the extractor function by name
return extractor(file_data, media_folder)
def extract_content_data(archive_path: str, file_info: Dict[str, Any], extractor: Callable, media_folder: str) -> List[Content]:
try:
return extractor(file_info['data'], media_folder)
except Exception as e:
logger.error(f"Error extracting data with {extractor.__name__}: {e}")
return []
def extract_data(archive_path: str, type_info: Dict[str, Any], extractor: Callable) -> List[Content]:
media_folder = os.path.join(archive_path, 'data', 'tweets_media')
contents = []
extractor_name = extractor.__name__
with ProcessPoolExecutor() as executor:
args_list = [
(archive_path, file_info, extractor_name, media_folder)
for file_info in type_info.get('files', [])
]
futures = [executor.submit(process_file_wrapper, args) for args in args_list]
total_futures = len(futures)
logger.info(f"Processing {total_futures} files with {extractor_name}")
completed_count = 0
for future in as_completed(futures):
result = future.result()
if result:
contents.extend(result)
completed_count += 1
if completed_count % 10 == 0 or completed_count == total_futures:
logger.info(f"Processed {completed_count}/{total_futures} files")
logger.info(f"Total {extractor_name} extracted: {len(contents)} from {len(type_info.get('files', []))} files")
return contents
def extract_tweets(file_data: List[Dict[str, Any]], media_folder: str) -> List[Content]:
logger.info(f"Extracting tweets from {len(file_data)} items")
contents = [
content
for tweet in file_data if 'tweet' in tweet
for content in extract_content(tweet['tweet'], 'tweet', media_folder)
]
logger.info(f"Extracted {len(contents)} tweet contents")
return contents
def extract_likes(file_data: List[Dict[str, Any]], media_folder: str) -> List[Content]:
logger.info(f"Extracting likes from {len(file_data)} items")
contents = [
content
for like in file_data if 'like' in like
for content in extract_content(like['like'], 'like', media_folder)
]
logger.info(f"Extracted {len(contents)} like contents")
return contents
def extract_archive_data(archive_path: str) -> Dict[str, List[Content]]:
try:
manifest_path = os.path.join(archive_path, 'data', 'manifest.js')
manifest = extract_manifest(manifest_path)
data_types = manifest.get('dataTypes', {})
extractors = {
'tweets': extract_tweets,
'like': extract_likes,
# Add more extractors as needed
}
response = {}
for data_type, extractor in extractors.items():
if data_type in data_types:
response[data_type] = extract_data(archive_path, data_types[data_type], extractor)
return response
except Exception as e:
logger.error(f"Error occurred during data extraction: {e}")
return {}
# Data transformation functions
def clean_text(text: str, entities: Optional[Dict] = None) -> str:
if entities:
for url in entities.get('urls', []):
short_url = url.get('url', '')
expanded_url = url.get('expanded_url', '')
if short_url and expanded_url:
text = text.replace(short_url, expanded_url)
text = re.sub(r'https://t.co/\w+', '', text)
text = re.sub(r'@\w+', '', text)
text = re.sub(r'#\w+', '', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def get_all_tweets(data: Dict[str, List[Content]]) -> Dict[str, Content]:
logger.info("Combining tweets and likes into all_tweets")
all_tweets = {tweet.id: tweet for tweet in data.get('tweets', []) if tweet.id}
logger.info(f"Added {len(data.get('tweets', []))} tweets to all_tweets")
likes = data.get('like', [])
for like in likes:
if like.id:
all_tweets[like.id] = like
else:
logger.warning("Like without id encountered and skipped.")
logger.info(f"Added {len(likes)} likes to all_tweets")
logger.info(f"Total {len(all_tweets)} tweets/likes in all_tweets")
return all_tweets
def get_conversation_texts(conversation: List[Content]) -> List[Tuple[str, str]]:
return [
(tweet.text, "assistant" if 'full_text' in tweet.metadata else "user")
for tweet in conversation
if tweet.text
]
def trim_conversation_to_last_assistant(conversation_data: List[Message]) -> List[Message]:
for i in range(len(conversation_data) - 1, -1, -1):
if conversation_data[i].role == "assistant":
return conversation_data[:i+1]
return []
def get_conversation_data(conversation: List[Content]) -> List[Message]:
conversation_data = []
current_role = None
current_content = []
for text, role in get_conversation_texts(conversation):
cleaned_text = clean_text(text)
if cleaned_text:
if role != current_role and current_role is not None:
conversation_data.append(format_message(current_content, current_role))
current_content = []
current_role = role
current_content.append(cleaned_text)
if current_content:
conversation_data.append(format_message(current_content, current_role))
return trim_conversation_to_last_assistant(conversation_data)
def extract_threads_and_conversations(all_tweets: Dict[str, Content]) -> Tuple[List[Thread], List[List[Content]]]:
"""Extract threads and conversations from all tweets."""
threads = []
conversations = []
# Keep track of processed tweet IDs to avoid duplicates
processed_ids = set()
for tweet in all_tweets.values():
if tweet.id in processed_ids:
continue
if tweet.content_source == 'tweet' and tweet.parent_id and tweet.parent_id in all_tweets and not tweet.text.startswith('RT'):
# Initialize the chain
chain = [tweet]
current_tweet = tweet
# Walk up the chain of replies
while current_tweet.parent_id and current_tweet.parent_id in all_tweets:
parent_tweet = all_tweets[current_tweet.parent_id]
chain.append(parent_tweet)
current_tweet = parent_tweet
if current_tweet.id in processed_ids:
break # Avoid cycles
# Mark tweets as processed
for t in chain:
processed_ids.add(t.id)
# Determine if it's a thread or conversation
if all(t.content_source == 'tweet' for t in chain):
# This is a thread (user replying to themselves)
threads.append(Thread(id=tweet.id, contents=list(reversed(chain))))
else:
# This is a conversation (user replying to others)
conversations.append(list(reversed(chain)))
return threads, conversations
# Data export functions
def process_media_files(media_files: List[Dict[str, Any]], images_folder: str) -> List[str]:
media_links = []
for media_file in media_files:
media_path = media_file.get('path')
if media_path and os.path.isfile(media_path):
orig_filename = os.path.basename(media_path)
new_filename = f"_{orig_filename}"
dest_path = os.path.join(images_folder, new_filename)
shutil.copy(media_path, dest_path)
media_links.append(f"")
else:
logger.warning(f"Invalid or missing media path: {media_path}")
return media_links
def save_thread_markdown(thread: Thread, output_dir: str, media_folder: str, images_folder: str):
if not thread.contents:
logger.warning("Attempted to save an empty thread.")
return
try:
date_str = thread.contents[0].timestamp
date = datetime.strptime(date_str, '%a %b %d %H:%M:%S %z %Y').date()
except ValueError:
logger.warning(f"Invalid date format: {date_str}")
date = datetime.today().date()
frontmatter = f"---\nDate: {date.isoformat()}\n---\n"
thread_text = []
for tweet in thread.contents:
media_links = process_media_files(tweet.media_files, images_folder)
cleaned_text = clean_text(tweet.text, tweet.metadata.get('entities'))
combined_text = f"{cleaned_text}\n\n" + '\n\n'.join(media_links)
thread_text.append(combined_text)
first_words = ' '.join(thread_text[0].split()[:5])
sanitized_filename = re.sub(r'[^\w\-_ ]', '', first_words).strip().replace(' ', '_')[:50]
filename = f"{sanitized_filename}.md"
file_path = os.path.join(output_dir, filename)
top_tweet_id = thread.contents[0].id
top_tweet_link = f"https://twitter.com/i/web/status/{top_tweet_id}"
with open(file_path, 'w', encoding='utf-8') as f:
f.write(f"{frontmatter}\n\n" + '\n\n'.join(thread_text) + f"\n\n[View original]({top_tweet_link})")
def save_tweets_by_date(all_tweets: Dict[str, Content], threads: List[Thread], output_dir: str, images_folder: str):
thread_ids = {tweet.id for thread in threads for tweet in thread.contents}
non_thread_tweets = [
tweet for tweet_id, tweet in all_tweets.items()
if tweet_id not in thread_ids
and not tweet.parent_id
and tweet.content_source == 'tweet'
and not tweet.text.startswith('RT')
]
tweets_by_date: Dict[datetime.date, List[Content]] = {}
for tweet in non_thread_tweets:
date_str = tweet.timestamp
if not date_str:
logger.warning(f"Tweet missing date information: {tweet}")
continue
try:
date = datetime.strptime(date_str, '%a %b %d %H:%M:%S %z %Y').date()
tweets_by_date.setdefault(date, []).append(tweet)
except ValueError:
logger.warning(f"Invalid date format: {date_str}")
for date, tweets_on_date in tweets_by_date.items():
filename = f"{date.isoformat()}.md"
file_path = os.path.join(output_dir, filename)
tweets_on_date.sort(key=lambda x: x.timestamp)
content = '\n\n---\n\n'.join(
f"*{datetime.strptime(tweet.timestamp, '%a %b %d %H:%M:%S %z %Y').strftime('%I:%M %p')}* \n{clean_text(tweet.text, tweet.metadata.get('entities'))}" +
''.join(process_media_files(tweet.media_files, images_folder))
for tweet in tweets_on_date
)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
def format_message(content: List[str], role: Literal['assistant', 'user']) -> Message:
return Message(role=role, content="\n\n".join(content))
def format_conversation(conversation_data: List[Message], system_message: str) -> Dict[str, Any]:
messages = [{"role": "system", "content": system_message}]
messages.extend([message.__dict__ for message in conversation_data])
return {"messages": messages}
def save_conversations_to_jsonl(threads: List[Thread], conversations: List[List[Content]], output_path: str, system_message: str = "You have been uploaded to the internet"):
logger.info(f"Saving {len(conversations) + len(threads)} conversations to {output_path} in oai format")
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
for thread in threads:
formatted_thread = get_conversation_data(thread.contents)
if not formatted_thread:
continue
formatted_thread = format_conversation(formatted_thread, system_message)
f.write(json.dumps(formatted_thread) + '\n')
for conversation in conversations:
formatted_conv = get_conversation_data(conversation)
if not formatted_conv:
continue
formatted_conv = format_conversation(formatted_conv, system_message)
f.write(json.dumps(formatted_conv) + '\n')
def main(archive_path: str, output_dir: str, output_formats: List[str], system_message: str):
data = extract_archive_data(archive_path)
all_tweets = get_all_tweets(data)
threads, conversations = extract_threads_and_conversations(all_tweets)
if 'markdown' in output_formats:
threads_output_dir = os.path.join(output_dir, 'threads')
images_folder = os.path.join(output_dir, 'images')
non_thread_output_dir = os.path.join(output_dir, 'tweets_by_date')
os.makedirs(threads_output_dir, exist_ok=True)
os.makedirs(images_folder, exist_ok=True)
os.makedirs(non_thread_output_dir, exist_ok=True)
logger.info(f"Saving {len(threads)} threads")
for i, thread in enumerate(threads, start=1):
save_thread_markdown(
thread,
threads_output_dir,
os.path.join(archive_path, 'data', 'tweets_media'),
images_folder
)
if i % 10 == 0 or i == len(threads):
logger.info(f"Saved {i}/{len(threads)} threads")
save_tweets_by_date(all_tweets, threads, non_thread_output_dir, images_folder)
if 'oai' in output_formats:
output_path = os.path.join(output_dir, 'conversations_oai.jsonl')
save_conversations_to_jsonl(threads, conversations, output_path, system_message)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process Twitter archive")
parser.add_argument("--archive-path", default="test", help="Path to the Twitter archive directory")
parser.add_argument("--output-dir", default="output", help="Directory where outputs will be saved")
parser.add_argument("--output-formats", nargs='+', default=['markdown', 'oai'],
help="Output formats to generate (markdown, oai)")
parser.add_argument("--system-message", default="You have been uploaded to the internet",
help="System message for the conversation")
args = parser.parse_args()
main(args.archive_path, args.output_dir, args.output_formats, args.system_message)
It’s not perfect. The biggest problem right now is that your “note tweets” (the internal name for Long Tweets) are not included. That’s because the archive format is janky and bad, sorry. There’s a different structure for the note tweet file. 🤷 https://www.theverge.com/23453703/twitter-archive-download-how-to-tweets
This could be fixed by using an actual JavaScript parser in Python, or writing your own. o1-preview wrote one for me but it made the file like twice as long, so I decided to drop it. Honestly, could probably rewrite the whole thing in JS and have a better time of it.

The fine-tuning data includes all your posts and threads. It concatenates threads into longer texts, so your clone should be able to make multi-thought responses. It also includes the text of posts you replied to, if you hit the ♥️ button on them.
This is because the archive saves the text of all your liked posts. Another W for tpot social norms! So for your replies, if we can get the text of the post you replied to, we make that the “user” role and your reply the “assistant” role.
It’s a really simple, blunt instrument right now, but it works. I used this on my own archive to create the AI behind http://deeperfates.com and used similar logic on the Glowfic Project Lawful to create the Infinite Keltham Machine. It could definitely use improvement!
Things you could do to make this better:
- Figure out note tweets
- Remove low-info replies like “lol”
- Actually scrape Twitter to get full conversations
That last one is because liked-tweets don’t have parent IDs, so all the like-reply pairs are separate units right now.
Another thing I’d like to do at some point: cluster the tweets and label the clusters with an LLM. Then we could do some automatic data improvements like in this excellent post
If you don’t even want to fine-tune a model, that’s fine too - just do
--output-formats markdown
and you’ll get a folder of text and media files. Threads get one file each, everything else is collected by day. You can explore it like any other vault.
Make your archive into a website with any of the site generators that take Markdown files. Or, if you don’t want to write any code at all, just use http://blot.im! It’s $4/month and makes a website out of a folder. Not an ad, I just like their service and use it myself.
Good question! This currently just outputs the OpenAI format, because that’s what uses. I like OpenPipe because you can continuously collect the logs and add them to your dataset. Check next tweet for a script to convert to ShareGPT.
this will be useful — does it use sharegpt format?
— interstellarninja (@intrstllrninja) November 17, 2024
Here’s the script. Just scrapped it out of a larger codebase so it’s not very refined, but it should work. Run it like
python convert_oai_to_sharegpt.py conversations_oai.jsonl conversations_sharegpt.jsonl
import json
import argparse
def convert_oai_to_sharegpt(input_file: str, output_file: str):
with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
for line in infile:
conversation = json.loads(line)
# Skip system messages
for message in conversation["messages"]:
if message.get("role") == "system":
continue
# Adjust the format for ShareGPT
sharegpt_message = {
"from": message["role"],
"value": message["content"]
}
outfile.write(json.dumps(sharegpt_message) + '\n')
def main():
parser = argparse.ArgumentParser(description='Convert conversations_oai.jsonl to conversations_sharegpt.jsonl')
parser.add_argument('input_file', type=str, help='Input JSONL file (conversations_oai.jsonl)')
parser.add_argument('output_file', type=str, help='Output JSONL file (conversations_sharegpt.jsonl)')
args = parser.parse_args()
convert_oai_to_sharegpt(args.input_file, args.output_file)
print(f'Converted {args.input_file} to {args.output_file} successfully!')
if __name__ == '__main__':
main()
